]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
WIP: data_modeling refactored docs-python-refac-hoq9jw/deployments/8532 python-refactoring-modeling
authorAleš Mrázek <ales.mrazek@nic.cz>
Tue, 13 Jan 2026 00:28:00 +0000 (01:28 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 6 Feb 2026 15:04:26 +0000 (16:04 +0100)
28 files changed:
python/knot_resolver/config/__init__.py [new file with mode: 0644]
python/knot_resolver/config/model/__init__.py [new file with mode: 0644]
python/knot_resolver/config/model/config_model.py [new file with mode: 0644]
python/knot_resolver/config/model/types/__init__.py [new file with mode: 0644]
python/knot_resolver/config/model/types/float_types.py [new file with mode: 0644]
python/knot_resolver/config/model/types/integer_types.py [new file with mode: 0644]
python/knot_resolver/config/model/types/string_types.py [new file with mode: 0644]
python/knot_resolver/config/templates/__init__.py [new file with mode: 0644]
python/knot_resolver/config/templates/policy-loader.lua.j2 [new file with mode: 0644]
python/knot_resolver/config/templates/worker.lua.j2 [new file with mode: 0644]
python/knot_resolver/utils/modeling/__init__.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/errors.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/model_node.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/parsing.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/__init__.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/base_float_types.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/base_generic_types.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/base_integer_types.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/base_string_types.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/base_types.py [new file with mode: 0644]
python/knot_resolver/utils/modeling/types/inspect.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/test_errors.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/test_parsing.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/types/test_base_float_types.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/types/test_base_generic_types.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/types/test_base_integer_types.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/types/test_base_string_types.py [new file with mode: 0644]
tests/python/knot_resolver/utils/modeling/types/test_inspect.py [new file with mode: 0644]

diff --git a/python/knot_resolver/config/__init__.py b/python/knot_resolver/config/__init__.py
new file mode 100644 (file)
index 0000000..75cedc5
--- /dev/null
@@ -0,0 +1,5 @@
+from .model.config_model import KresConfigModel
+
+__all__ = [
+    "KresConfigModel",
+]
diff --git a/python/knot_resolver/config/model/__init__.py b/python/knot_resolver/config/model/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/python/knot_resolver/config/model/config_model.py b/python/knot_resolver/config/model/config_model.py
new file mode 100644 (file)
index 0000000..f0637ae
--- /dev/null
@@ -0,0 +1,5 @@
+from knot_resolver.utils.modeling import ModelNode
+
+
+class KresConfigModel(ModelNode):
+    pass
diff --git a/python/knot_resolver/config/model/types/__init__.py b/python/knot_resolver/config/model/types/__init__.py
new file mode 100644 (file)
index 0000000..3a86656
--- /dev/null
@@ -0,0 +1,22 @@
+from .float_types import FloatNonNegative
+from .integer_types import (
+    Integer0_32,
+    Integer0_512,
+    Integer0_65535,
+    IntegerNonNegative,
+    IntegerPositive,
+    Percent,
+    PortNumber,
+)
+# from .string_types import 
+
+__all__ = [
+    "FloatNonNegative",
+    "Integer0_32",
+    "Integer0_512",
+    "Integer0_65535",
+    "IntegerNonNegative",
+    "IntegerPositive",
+    "Percent",
+    "PortNumber",
+]
diff --git a/python/knot_resolver/config/model/types/float_types.py b/python/knot_resolver/config/model/types/float_types.py
new file mode 100644 (file)
index 0000000..576a0a7
--- /dev/null
@@ -0,0 +1,5 @@
+from knot_resolver.utils.modeling.types import BaseFloatRange
+
+
+class FloatNonNegative(BaseFloatRange):
+    _min: float = 0.0
diff --git a/python/knot_resolver/config/model/types/integer_types.py b/python/knot_resolver/config/model/types/integer_types.py
new file mode 100644 (file)
index 0000000..6f8ffff
--- /dev/null
@@ -0,0 +1,36 @@
+# ruff: noqa: N801
+
+from knot_resolver.utils.modeling.types import BaseIntegerRange
+
+
+class Integer0_32(BaseIntegerRange):
+    _min: int = 0
+    _max: int = 32
+
+
+class Integer0_512(BaseIntegerRange):
+    _min: int = 0
+    _max: int = 512
+
+
+class Integer0_65535(BaseIntegerRange):
+    _min: int = 0
+    _max: int = 65_535
+
+
+class IntegerNonNegative(BaseIntegerRange):
+    _min: int = 0
+
+
+class IntegerPositive(BaseIntegerRange):
+    _min: int = 1
+
+
+class Percent(BaseIntegerRange):
+    _min: int = 0
+    _max: int = 100
+
+
+class PortNumber(BaseIntegerRange):
+    _min: int = 1
+    _max: int = 65_535
diff --git a/python/knot_resolver/config/model/types/string_types.py b/python/knot_resolver/config/model/types/string_types.py
new file mode 100644 (file)
index 0000000..277c5bc
--- /dev/null
@@ -0,0 +1 @@
+from knot_resolver.utils.modeling.types import BaseString
diff --git a/python/knot_resolver/config/templates/__init__.py b/python/knot_resolver/config/templates/__init__.py
new file mode 100644 (file)
index 0000000..9c5d5ed
--- /dev/null
@@ -0,0 +1,33 @@
+from pathlib import Path
+
+from jinja2 import Environment, FileSystemLoader, StrictUndefined, Template
+
+
+def _get_templates_path() -> Path:
+    templates_path = Path(__file__).resolve().parent
+    if not templates_path.exists():
+        raise FileNotFoundError(templates_path)
+    if not templates_path.is_dir():
+        raise NotADirectoryError(templates_path)
+    return templates_path
+
+
+_TEMPLATES_PATH: Path = _get_templates_path()
+
+
+def _load_template_from_str(template: str) -> Template:
+    loader = FileSystemLoader(_TEMPLATES_PATH)
+    env = Environment(trim_blocks=True, lstrip_blocks=True, loader=loader, undefined=StrictUndefined)  # noqa: S701
+    return env.from_string(template)
+
+
+def _import_template(template: str) -> Template:
+    template_file = _TEMPLATES_PATH / template
+    with template_file.open() as file:
+        template = file.read()
+    return _load_template_from_str(template)
+
+
+WORKER_TEMPLATE: Template = _import_template("worker.lua.j2")
+
+POLICY_LOADER_TEMPLATE: Template = _import_template("policy-loader.lua.j2")
diff --git a/python/knot_resolver/config/templates/policy-loader.lua.j2 b/python/knot_resolver/config/templates/policy-loader.lua.j2
new file mode 100644 (file)
index 0000000..4f8d1ad
--- /dev/null
@@ -0,0 +1,23 @@
+{% if not cfg.lua.policy_script_only %}
+
+ffi = require('ffi')
+local C = ffi.C
+
+
+
+{% endif %}
+
+-- LUA section --------------------------------------
+-- Custom Lua code cannot be validated
+
+{% if cfg.lua.policy_script_file %}
+{% import cfg.lua.policy_script_file as policy_script_file %}
+{{ policy_script_file }}
+{% endif %}
+
+{% if cfg.lua.policy_script %}
+{{ cfg.lua.policy_script }}
+{% endif %}
+
+-- exit properly
+quit()
diff --git a/python/knot_resolver/config/templates/worker.lua.j2 b/python/knot_resolver/config/templates/worker.lua.j2
new file mode 100644 (file)
index 0000000..6084c23
--- /dev/null
@@ -0,0 +1,20 @@
+{% if not cfg.lua.script_only %}
+
+ffi = require('ffi')
+local C = ffi.C
+
+
+
+{% endif %}
+
+-- LUA section --------------------------------------
+-- Custom Lua code cannot be validated
+
+{% if cfg.lua.script_file %}
+{% import cfg.lua.script_file as script_file %}
+{{ script_file }}
+{% endif %}
+
+{% if cfg.lua.script %}
+{{ cfg.lua.script }}
+{% endif %}
diff --git a/python/knot_resolver/utils/modeling/__init__.py b/python/knot_resolver/utils/modeling/__init__.py
new file mode 100644 (file)
index 0000000..2ee389f
--- /dev/null
@@ -0,0 +1,9 @@
+from .model_node import ModelNode
+from .parsing import parse_json, parse_yaml, try_to_parse
+
+__all__ = [
+    "ModelNode",
+    "parse_json",
+    "parse_yaml",
+    "try_to_parse",
+]
diff --git a/python/knot_resolver/utils/modeling/errors.py b/python/knot_resolver/utils/modeling/errors.py
new file mode 100644 (file)
index 0000000..fcc6599
--- /dev/null
@@ -0,0 +1,117 @@
+from __future__ import annotations
+
+from knot_resolver.errors import BaseKresError
+
+
+class DataModelingError(BaseKresError):
+    """Base exception class for all data modeling errors."""
+
+    def __init__(self, msg: str, error_path: str = "") -> None:
+        super().__init__()
+        self._msg = f"[{error_path}] {msg}" if error_path else msg
+        self._error_path = error_path
+
+    def __str__(self) -> str:
+        return self._msg
+
+
+class DataDescriptionError(DataModelingError):
+    """Exception class for data description errors."""
+
+    def __init__(self, msg: str, error_path: str = "") -> None:
+        msg = f"description error: {msg}"
+        super().__init__(msg, error_path)
+
+
+class DataAnnotationError(DataModelingError):
+    """Exception class for data annotation errors."""
+
+    def __init__(self, msg: str, error_path: str = "") -> None:
+        msg = f"annotation error: {msg}"
+        super().__init__(msg, error_path)
+
+
+class DataParsingError(DataModelingError):
+    """Exception class for data parsing errors."""
+
+    def __init__(self, msg: str, error_path: str = "") -> None:
+        msg = f"parsing error: {msg}"
+        super().__init__(msg, error_path)
+
+
+class DataTypeError(DataModelingError):
+    """Exception class for data type errors."""
+
+    def __init__(self, msg: str, error_path: str = "") -> None:
+        msg = f"type error: {msg}"
+        super().__init__(msg, error_path)
+
+
+class DataValueError(DataModelingError):
+    """Exception class for data value errors."""
+
+    def __init__(self, msg: str, error_path: str = "") -> None:
+        msg = f"value error: {msg}"
+        super().__init__(msg, error_path)
+
+
+class DataValidationError(DataModelingError):
+    """
+    Exception class for data validation errors.
+
+    This exception is used as parent for other data modeling errors.
+    """
+
+    def __init__(self, msg: str, error_path: str, child_errors: list[DataModelingError] | None = None) -> None:
+        super().__init__(msg, error_path)
+
+        if child_errors is None:
+            child_errors = []
+        self._child_errors = child_errors
+
+    def recursive_msg(self, indentation: int = 0) -> str:
+        parts: list[str] = []
+
+        if indentation == 0:
+            indentation += 1
+            parts.append("Data validation error detected:")
+
+        indent = "    " * indentation
+        parts.append(f"{indent}{self._msg}")
+
+        if self._child_errors:
+            for error in self._child_errors:
+                if isinstance(error, DataValidationError):
+                    parts.append(error.recursive_msg(indentation + 1))
+                else:
+                    parts.append(indent + f"    {error}")
+        return "\n".join(parts)
+
+    def __str__(self) -> str:
+        return self.recursive_msg()
+
+
+class AggrDataValidationError(DataValidationError):
+    """
+    Exception class for aggregation of data validation errors.
+
+    This exception is used to aggregate other data modeling errors.
+    """
+
+    def __init__(self, error_path: str, child_errors: list[DataModelingError]) -> None:
+        super().__init__("error due to lower level error", error_path, child_errors)
+
+    def recursive_msg(self, indentation: int = 0) -> str:
+        inc = 0
+        parts: list[str] = []
+
+        if indentation == 0:
+            inc = 1
+            parts.append("Data validation errors detected:")
+
+        for error in self._child_errors:
+            if isinstance(error, DataValidationError):
+                parts.append(error.recursive_msg(indentation + inc))
+            else:
+                parts.append(f"    {error}")
+        return "\n".join(parts)
diff --git a/python/knot_resolver/utils/modeling/model_node.py b/python/knot_resolver/utils/modeling/model_node.py
new file mode 100644 (file)
index 0000000..66023d0
--- /dev/null
@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+
+class ModelNode:
+    """"""
+
+    def __init__(self, source: dict[Any, Any], tree_path: str = "/", base_path: Path = Path()):
+        self._source = source if source else {}
+        self._tree_path = tree_path
+        self._base_path = base_path
+
+    def validate(self) -> None:
+        pass
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        raise NotImplementedError
diff --git a/python/knot_resolver/utils/modeling/parsing.py b/python/knot_resolver/utils/modeling/parsing.py
new file mode 100644 (file)
index 0000000..8425b73
--- /dev/null
@@ -0,0 +1,107 @@
+from __future__ import annotations
+
+import json
+from enum import Enum, auto
+from typing import TYPE_CHECKING, Any
+
+import yaml
+from yaml.constructor import ConstructorError
+
+from knot_resolver.utils.modeling.errors import DataParsingError
+
+if TYPE_CHECKING:
+    from yaml.nodes import MappingNode
+
+
+def _json_raise_duplicates(pairs: list[tuple[Any, Any]]) -> dict[Any, Any]:
+    """
+    JSON hook used in 'json.loads()' that detects duplicate keys in the parsed data.
+
+    The code for this hook was highly inspired by: https://stackoverflow.com/q/14902299/12858520
+    """
+    mapping: dict[Any, Any] = {}
+    for key, value in pairs:
+        if key in mapping:
+            msg = f"duplicate key detected: {key}"
+            raise DataParsingError(msg)
+        mapping[key] = value
+    return mapping
+
+
+class _YAMLRaiseDuplicatesLoader(yaml.SafeLoader):
+    """
+    YAML loader used in 'yaml.loads()' that detects duplicate keys in the parsed data.
+
+    The code for this loader was highly inspired by: https://gist.github.com/pypt/94d747fe5180851196eb
+    The loader extends yaml.SafeLoader, so it should be safe, even though the linter reports unsafe-yaml-load (S506).
+    More about safe loader: https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
+    """
+
+    def construct_mapping(self, node: MappingNode, deep: bool = False) -> dict[Any, Any]:
+        mapping: dict[Any, Any] = {}
+        for key_node, value_node in node.value:
+            key = self.construct_object(key_node, deep=deep)
+            # we need to check, that the key object can be used in a hash table
+            try:
+                _ = hash(key)
+            except TypeError as exc:
+                raise ConstructorError(
+                    "while constructing a mapping",
+                    node.start_mark,
+                    f"found unacceptable key ({exc})",
+                    key_node.start_mark,
+                ) from exc
+
+            # check for duplicate keys
+            if key in mapping:
+                msg = f"duplicate key detected: {key_node.start_mark}"
+                raise DataParsingError(msg)
+            value = self.construct_object(value_node, deep=deep)
+            mapping[key] = value
+        return mapping
+
+
+class DataFormat(Enum):
+    YAML = auto()
+    JSON = auto()
+
+    def loads(self, text: str) -> dict[Any, Any]:
+        """Load data from string in data format and return the data in dictionary."""
+        if self is DataFormat.YAML:
+            return yaml.load(text, Loader=_YAMLRaiseDuplicatesLoader)  # noqa: S506
+        if self is DataFormat.JSON:
+            return json.loads(text, object_pairs_hook=_json_raise_duplicates)
+        msg = f"parsing data from '{self}' format is not implemented"
+        raise NotImplementedError(msg)
+
+    def dumps(self, data: dict[Any, Any], indent: int | None = None) -> str:
+        """Dump dictionary data to string in required data format."""
+        if self is DataFormat.YAML:
+            return yaml.safe_dump(data, indent=indent)
+        if self is DataFormat.JSON:
+            return json.dumps(data, indent=indent)
+        msg = f"exporting data to '{self}' format is not implemented"
+        raise NotImplementedError(msg)
+
+
+def parse_yaml(data: str) -> dict[Any, Any]:
+    """Parse YAML string and return the data in dictionary."""
+    return DataFormat.YAML.loads(data)
+
+
+def parse_json(data: str) -> dict[Any, Any]:
+    """Parse JSON string and return the data in dictionary."""
+    return DataFormat.JSON.loads(data)
+
+
+def try_to_parse(data: str) -> dict[Any, Any]:
+    """Attempt to parse data string as a JSON or YAML and return it's dictionary."""
+    try:
+        return parse_json(data)
+    except json.JSONDecodeError:
+        try:
+            return parse_yaml(data)
+        except yaml.YAMLError as e:
+            # YAML parsing error should be sufficient because the JSON can be parsed by the YAML parser.
+            # We should receive a helpful error message for JSON as well.
+            raise DataParsingError(e) from e
diff --git a/python/knot_resolver/utils/modeling/types/__init__.py b/python/knot_resolver/utils/modeling/types/__init__.py
new file mode 100644 (file)
index 0000000..b29ed67
--- /dev/null
@@ -0,0 +1,23 @@
+from .base_float_types import BaseFloat, BaseFloatRange
+from .base_generic_types import ListOrItem
+from .base_integer_types import BaseInteger, BaseIntegerRange
+from .base_string_types import (
+    BaseString,
+    BaseStringLength,
+    BaseStringPattern,
+    BaseUnit,
+)
+from .base_types import NoneType
+
+__all__ = [
+    "BaseFloat",
+    "BaseFloatRange",
+    "BaseInteger",
+    "BaseIntegerRange",
+    "BaseString",
+    "BaseStringLength",
+    "BaseStringPattern",
+    "BaseUnit",
+    "ListOrItem",
+    "NoneType",
+]
diff --git a/python/knot_resolver/utils/modeling/types/base_float_types.py b/python/knot_resolver/utils/modeling/types/base_float_types.py
new file mode 100644 (file)
index 0000000..dafa96d
--- /dev/null
@@ -0,0 +1,52 @@
+from __future__ import annotations
+
+from typing import Any
+
+from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError
+
+from .base_types import BaseType
+
+
+class BaseFloat(BaseType):
+    """Base class to work with float value."""
+
+    def validate(self) -> None:
+        if not isinstance(self._value, (float, int)) or isinstance(self._value, bool):
+            msg = (
+                f"Unexpected value for '{type(self)}'."
+                f" Expected float, got '{self._value}' with type '{type(self._value)}'"
+            )
+            raise DataTypeError(msg, self._tree_path)
+
+    def __int__(self) -> int:
+        return int(self._value)
+
+    def __float__(self) -> float:
+        return float(self._value)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        return {"type": "number"}
+
+
+class BaseFloatRange(BaseFloat):
+    _min: float
+    _max: float
+
+    def validate(self) -> None:
+        super().validate()
+        if hasattr(self, "_min") and (self._value < self._min):
+            msg = f"value {self._value} is lower than the minimum {self._min}."
+            raise DataValueError(msg, self._tree_path)
+        if hasattr(self, "_max") and (self._value > self._max):
+            msg = f"value {self._value} is higher than the maximum {self._max}"
+            raise DataValueError(msg, self._tree_path)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        typ: dict[str, Any] = {"type": "number"}
+        if hasattr(cls, "_min"):
+            typ["minimum"] = cls._min
+        if hasattr(cls, "_max"):
+            typ["maximum"] = cls._max
+        return typ
diff --git a/python/knot_resolver/utils/modeling/types/base_generic_types.py b/python/knot_resolver/utils/modeling/types/base_generic_types.py
new file mode 100644 (file)
index 0000000..260af0f
--- /dev/null
@@ -0,0 +1,33 @@
+from __future__ import annotations
+
+from typing import Any, Generic, List, TypeVar, Union
+
+from .base_types import BaseType
+
+T = TypeVar("T")
+
+
+class BaseGenericTypeWrapper(Generic[T], BaseType):
+    """"""
+
+
+class ListOrItem(BaseGenericTypeWrapper[Union[List[T], T]]):
+    """"""
+
+    def _get_list(self) -> list[T]:
+        return self._value if isinstance(self._value, list) else [self._value]
+
+    def validate(self) -> None:
+        self._get_list()
+
+    def __getitem__(self, index: Any) -> T:
+        return self._get_list()[index]
+
+    def to_std(self) -> list[T]:
+        return self._get_list()
+
+    def __len__(self) -> int:
+        return len(self._get_list())
+
+    def serialize(self) -> list[T] | T:
+        return self._value
diff --git a/python/knot_resolver/utils/modeling/types/base_integer_types.py b/python/knot_resolver/utils/modeling/types/base_integer_types.py
new file mode 100644 (file)
index 0000000..ece7b08
--- /dev/null
@@ -0,0 +1,49 @@
+from __future__ import annotations
+
+from typing import Any
+
+from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError
+
+from .base_types import BaseType
+
+
+class BaseInteger(BaseType):
+    """Base class to work with integer value."""
+
+    def validate(self) -> None:
+        if not isinstance(self._value, int) or isinstance(self._value, bool):
+            msg = (
+                f"Unexpected value for '{type(self)}'"
+                f" Expected integer, got '{self._value}' with type '{type(self._value)}'"
+            )
+            raise DataTypeError(msg, self._tree_path)
+
+    def __int__(self) -> int:
+        return int(self._value)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        return {"type": "integer"}
+
+
+class BaseIntegerRange(BaseInteger):
+    _min: int
+    _max: int
+
+    def validate(self) -> None:
+        super().validate()
+        if hasattr(self, "_min") and (self._value < self._min):
+            msg = f"value {self._value} is lower than the minimum {self._min}."
+            raise DataValueError(msg, self._tree_path)
+        if hasattr(self, "_max") and (self._value > self._max):
+            msg = f"value {self._value} is higher than the maximum {self._max}"
+            raise DataValueError(msg, self._tree_path)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        typ: dict[str, Any] = {"type": "integer"}
+        if hasattr(cls, "_min"):
+            typ["minimum"] = cls._min
+        if hasattr(cls, "_max"):
+            typ["maximum"] = cls._max
+        return typ
diff --git a/python/knot_resolver/utils/modeling/types/base_string_types.py b/python/knot_resolver/utils/modeling/types/base_string_types.py
new file mode 100644 (file)
index 0000000..c1e37a2
--- /dev/null
@@ -0,0 +1,113 @@
+from __future__ import annotations
+
+import re
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from knot_resolver.utils.modeling.errors import DataTypeError, DataValueError
+
+from .base_types import BaseType
+
+if TYPE_CHECKING:
+    from re import Pattern
+
+
+class BaseString(BaseType):
+    """Base class to work with string value."""
+
+    def validate(self) -> None:
+        if not isinstance(self._value, (str, int)) or isinstance(self._value, bool):
+            msg = (
+                f"Unexpected value for '{type(self)}'."
+                f" Expected string, got '{self._value}' with type '{type(self._value)}'"
+            )
+            raise DataTypeError(msg, self._tree_path)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        return {"type": "string"}
+
+
+class BaseStringLength(BaseString):
+    _min_bytes: int = 1
+    _max_bytes: int
+
+    def validate(self) -> None:
+        super().validate()
+        value_bytes = len(self._value.encode("utf-8"))
+        if hasattr(self, "_min_bytes") and (value_bytes < self._min_bytes):
+            msg = f"the string value {self._value} is shorter than the minimum {self._min_bytes} bytes."
+            raise DataValueError(msg, self._tree_path)
+        if hasattr(self, "_max_bytes") and (value_bytes > self._max_bytes):
+            msg = f"the string value {self._value} is longer than the maximum {self._max_bytes} bytes."
+            raise DataValueError(msg, self._tree_path)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        typ: dict[str, Any] = {"type": "string"}
+        if hasattr(cls, "_min_bytes"):
+            typ["minLength"] = cls._min_bytes
+        if hasattr(cls, "_max_bytes"):
+            typ["maxLength"] = cls._max_bytes
+        return typ
+
+
+class BaseStringPattern(BaseString):
+    _re: Pattern[str]
+
+    def validate(self) -> None:
+        super().validate()
+        if not type(self)._re.match(self._value):  # noqa: SLF001
+            msg = f"'{self._value}' does not match '{self._re.pattern}' pattern"
+            raise DataValueError(msg, self._tree_path)
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        return {"type": "string", "pattern": rf"{cls._re.pattern}"}
+
+
+class BaseUnit(BaseString):
+    _re: Pattern[str]
+    _units: dict[str, int]
+
+    def __init__(self, value: Any, tree_path: str = "/", base_path: Path = Path()) -> None:
+        super().__init__(value, tree_path, base_path)
+        type(self)._re = re.compile(rf"^(\d+)({r'|'.join(type(self)._units.keys())})$")  # noqa: SLF001
+
+    def _get_base_value(self) -> float:
+        cls = self.__class__
+
+        super().validate()
+        if isinstance(self._value, int) and not isinstance(self._value, bool):
+            return self._value
+
+        grouped = self._re.search(self._value)
+        if grouped:
+            val, unit = grouped.groups()
+            if unit is None:
+                msg = f"Missing units. Accepted units are {list(cls._units.keys())}"
+                raise DataValueError(msg, self._tree_path)
+            if unit not in cls._units:
+                msg = (
+                    f"Used unexpected unit '{unit}' for {type(self).__name__}."
+                    f" Accepted units are {list(cls._units.keys())}"
+                )
+                raise DataValueError(msg, self._tree_path)
+            return float(val) * cls._units[unit]
+        msg = (
+            f"Unexpected value for '{type(self)}'."
+            " Expected string that matches pattern "
+            rf"'{type(self)._re.pattern}'."  # noqa: SLF001
+            f" Positive integer and one of the units {list(type(self)._units.keys())}, got '{self._value}'."  # noqa: SLF001
+        )
+        raise DataValueError(msg, self._tree_path)
+
+    def validate(self) -> None:
+        self._get_base_value()
+
+    def __int__(self) -> int:
+        return int(self._get_base_value())
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        return {"type": "string", "pattern": rf"{cls._re.pattern}"}
diff --git a/python/knot_resolver/utils/modeling/types/base_types.py b/python/knot_resolver/utils/modeling/types/base_types.py
new file mode 100644 (file)
index 0000000..a658d67
--- /dev/null
@@ -0,0 +1,41 @@
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any, TypeVar
+
+T = TypeVar("T")
+
+NoneType = type(None)
+
+
+class BaseType:
+    """"""
+
+    def __init__(self, value: Any, tree_path: str = "/", base_path: Path = Path()) -> None:
+        self._value = value
+        self._tree_path = tree_path
+        self._base_path = base_path
+
+    def __repr__(self) -> str:
+        cls = self.__class__
+        return f'{cls.__name__}("{self._value}")'
+
+    def __eq__(self, o: object) -> bool:
+        cls = self.__class__
+        return isinstance(o, cls) and o._value == self._value
+
+    def __hash__(self) -> int:
+        return hash(self._value)
+
+    def __str__(self) -> str:
+        return str(self._value)
+
+    def __int__(self) -> int:
+        raise NotImplementedError
+
+    def validate() -> None:
+        raise NotImplementedError
+
+    @classmethod
+    def json_schema(cls) -> dict[Any, Any]:
+        raise NotImplementedError
diff --git a/python/knot_resolver/utils/modeling/types/inspect.py b/python/knot_resolver/utils/modeling/types/inspect.py
new file mode 100644 (file)
index 0000000..47a06a2
--- /dev/null
@@ -0,0 +1,102 @@
+from __future__ import annotations
+
+import inspect
+from typing import Any, Dict, List, Literal, Tuple, Union
+
+from knot_resolver.utils.modeling.errors import DataAnnotationError
+from knot_resolver.utils.modeling.types.base_generic_types import BaseGenericTypeWrapper
+
+NoneType = type(None)
+
+
+def get_annotations(obj: Any) -> dict[Any, Any]:
+    if hasattr(inspect, "get_annotations"):
+        return inspect.get_annotations(obj)
+    # TODO: safe to remove in python3.10
+    # This fallback only exists for older versions
+    return obj.__dict__.get("__annotations__", {})
+
+
+def get_generic_type_arguments(typ: Any) -> list[Any]:
+    return getattr(typ, "__args__", [])
+
+
+def get_generic_type_argument(typ: Any) -> Any:
+    args = get_generic_type_arguments(typ)
+    if len(args) == 1:
+        return args[0]
+    msg = f"expected one generic type argument, got {len(args)}"
+    raise DataAnnotationError(msg)
+
+
+def is_dict(typ: Any) -> bool:
+    return getattr(typ, "__origin__", None) in (Dict, dict)
+
+
+def is_base_generic_type_wrapper(typ: Any) -> bool:
+    origin = getattr(typ, "__origin__", None)
+    return inspect.isclass(origin) and issubclass(origin, BaseGenericTypeWrapper)
+
+
+def get_base_generic_type_wrapper_argument(typ: type[BaseGenericTypeWrapper[Any]]) -> Any:
+    if not hasattr(typ, "__origin__"):
+        msg = ""
+        raise DataAnnotationError(msg)
+
+    origin = getattr(typ, "__origin__")
+    if not hasattr(origin, "__orig_bases__"):
+        msg = ""
+        raise DataAnnotationError(msg)
+
+    orig_base: list[Any] = getattr(origin, "__orig_bases__", [])[0]
+    arg = get_generic_type_argument(typ)
+    return get_generic_type_argument(orig_base[arg])
+
+
+def is_list(typ: Any) -> bool:
+    return getattr(typ, "__origin__", None) in (List, list)
+
+
+def is_literal(typ: Any) -> bool:
+    return getattr(typ, "__origin__", None) == Literal
+
+
+def is_none_type(typ: Any) -> bool:
+    return typ is None or typ == NoneType
+
+
+def is_optional(typ: Any) -> bool:
+    origin = getattr(typ, "__origin__", None)
+    args = get_generic_type_arguments(typ)
+    optional_len = 2
+    return origin == Union and len(args) == optional_len and NoneType in args
+
+
+def is_tuple(typ: Any) -> bool:
+    return getattr(typ, "__origin__", None) in (Tuple, tuple)
+
+
+def is_union(typ: Any) -> bool:
+    return getattr(typ, "__origin__", None) == Union
+
+
+def get_optional_inner_type(optional: Any) -> Any:
+    if is_optional(optional):
+        args = get_generic_type_arguments(optional)
+        for arg in args:
+            if not is_none_type(arg):
+                return arg
+    msg = "failed to get inner optional type"
+    raise DataAnnotationError(msg)
+
+
+def getattr_type(obj: Any, attr_name: str) -> Any:
+    annot = get_annotations(type(obj))
+    if hasattr(annot, attr_name):
+        return annot[attr_name]
+    msg = "attribute name is missing in data annotations"
+    raise DataAnnotationError(msg)
+
+
+def is_attr_name_private(attr_name: str) -> bool:
+    return attr_name.startswith("_")
diff --git a/tests/python/knot_resolver/utils/modeling/test_errors.py b/tests/python/knot_resolver/utils/modeling/test_errors.py
new file mode 100644 (file)
index 0000000..0486ff7
--- /dev/null
@@ -0,0 +1,46 @@
+import pytest
+
+from knot_resolver.utils.modeling.errors import (
+    AggrDataValidationError,
+    DataAnnotationError,
+    DataDescriptionError,
+    DataModelingError,
+    DataTypeError,
+    DataValidationError,
+    DataValueError,
+)
+
+errors = [
+    DataModelingError("this is data modeling error message", "/error"),
+    DataAnnotationError("this is annotation error message", "/annotation"),
+    DataDescriptionError("this is description error message", "/description"),
+    DataTypeError("this is type error message", "/type"),
+    DataValueError("this is value error message", "/value"),
+]
+
+
+def test_data_validation_error() -> None:
+    error_msg = """Data validation error detected:
+    [/validation] this is validation error message
+        [/error] this is data modeling error message
+        [/annotation] annotation error: this is annotation error message
+        [/description] description error: this is description error message
+        [/type] type error: this is type error message
+        [/value] value error: this is value error message"""
+
+    with pytest.raises(DataValidationError) as error:
+        raise DataValidationError("this is validation error message", "/validation", errors)
+    assert str(error.value) == error_msg
+
+
+def test_aggregate_data_validation_error() -> None:
+    error_msg = """Data validation errors detected:
+    [/error] this is data modeling error message
+    [/annotation] annotation error: this is annotation error message
+    [/description] description error: this is description error message
+    [/type] type error: this is type error message
+    [/value] value error: this is value error message"""
+
+    with pytest.raises(AggrDataValidationError) as error:
+        raise AggrDataValidationError("/", errors)
+    assert str(error.value) == error_msg
diff --git a/tests/python/knot_resolver/utils/modeling/test_parsing.py b/tests/python/knot_resolver/utils/modeling/test_parsing.py
new file mode 100644 (file)
index 0000000..63fe122
--- /dev/null
@@ -0,0 +1,122 @@
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataParsingError
+from knot_resolver.utils.modeling.parsing import parse_json, parse_yaml, try_to_parse
+
+json_data = """
+{
+    "none": null,
+    "boolean": false,
+    "number": 2026,
+    "string": "this is string",
+    "object": {
+        "number": 5000,
+        "string": "this is object string"
+    },
+    "array": [
+        "item1",
+        "item2",
+        "item3"
+    ]
+}
+"""
+
+json_data_duplicates = """
+{
+    "duplicity-key": 1,
+    "duplicity-key": 2
+}
+"""
+
+json_data_duplicates_inner = """
+{
+    "object": {
+        "duplicity-key": 1,
+        "duplicity-key": 2
+    }
+}
+"""
+
+yaml_data = """
+none: null
+boolean: false
+number: 2026
+string: this is string
+object:
+  number: 5000
+  string: this is object string
+array:
+  - item1
+  - item2
+  - item3
+"""
+
+yaml_data_duplicates = """
+duplicity-key: 1
+duplicity-key: 2
+"""
+
+yaml_data_duplicates_inner = """
+object:
+    duplicity-key: 1
+    duplicity-key: 2
+"""
+
+data_dict = {
+    "none": None,
+    "boolean": False,
+    "number": 2026,
+    "string": "this is string",
+    "object": {
+        "number": 5000,
+        "string": "this is object string",
+    },
+    "array": [
+        "item1",
+        "item2",
+        "item3",
+    ],
+}
+
+
+def test_parse_json() -> None:
+    data = parse_json(json_data)
+    assert data == data_dict
+
+
+@pytest.mark.parametrize("data", [json_data, yaml_data])
+def test_parse_yaml(data: str) -> None:
+    data = parse_yaml(data)
+    assert data == data_dict
+
+
+@pytest.mark.parametrize(
+    "data",
+    [
+        json_data_duplicates,
+        json_data_duplicates_inner,
+    ],
+)
+def test_parse_json_duplicates(data: str) -> None:
+    with pytest.raises(DataParsingError):
+        parse_json(data)
+
+
+@pytest.mark.parametrize(
+    "data",
+    [
+        json_data_duplicates,
+        json_data_duplicates_inner,
+        yaml_data_duplicates,
+        yaml_data_duplicates_inner,
+    ],
+)
+def test_parse_yaml_duplicates(data: str) -> None:
+    with pytest.raises(DataParsingError):
+        parse_yaml(data)
+
+
+@pytest.mark.parametrize("data", [json_data, yaml_data])
+def test_try_to_parse(data: str) -> None:
+    data = try_to_parse(data)
+    assert data == data_dict
diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_float_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_float_types.py
new file mode 100644 (file)
index 0000000..478e3f2
--- /dev/null
@@ -0,0 +1,79 @@
+import random
+import sys
+from typing import Any, Optional
+
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataModelingError
+from knot_resolver.utils.modeling.types.base_float_types import BaseFloat, BaseFloatRange
+
+
+@pytest.mark.parametrize("value", [-65.535, -1, 0, 1, 65.535])
+def test_base_float(value: int):
+    obj = BaseFloat(value)
+    obj.validate()
+    assert float(obj) == value
+    assert int(obj) == int(value)
+    assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("value", [True, False, "1"])
+def test_base_float_invalid(value: Any):
+    with pytest.raises(DataModelingError):
+        BaseFloat(value).validate()
+
+
+@pytest.mark.parametrize("min,max", [(0.0, None), (None, 0.0), (1.5, 65.535), (-65.535, -1.5)])
+def test_base_float_range(min: Optional[float], max: Optional[float]):
+    class TestFloatRange(BaseFloatRange):
+        if min:
+            _min = min
+        if max:
+            _max = max
+
+    if min:
+        obj = TestFloatRange(min)
+        obj.validate()
+        assert float(obj) == min
+        assert int(obj) == int(min)
+        assert str(obj) == f"{min}"
+    if max:
+        obj = TestFloatRange(max)
+        obj.validate()
+        assert float(obj) == max
+        assert int(obj) == int(max)
+        assert str(obj) == f"{max}"
+
+    rmin = int(min + 1) if min else -sys.maxsize - 1
+    rmax = int(max - 1) if max else sys.maxsize
+
+    n = 100
+    values = [float(random.randint(rmin, rmax)) for _ in range(n)]
+
+    for value in values:
+        obj = TestFloatRange(value)
+        obj.validate()
+        assert float(obj) == float(value)
+        assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("min,max", [(0.0, None), (None, 0.0), (1.5, 65.535), (-65.535, -1.5)])
+def test_base_float_range_invalid(min: Optional[float], max: Optional[float]):
+    class TestFloatRange(BaseFloatRange):
+        if min:
+            _min = min
+        if max:
+            _max = max
+
+    n = 100
+    invalid_nums = []
+
+    rmin = int(min + 1) if min else -sys.maxsize - 1
+    rmax = int(max - 1) if max else sys.maxsize
+
+    invalid_nums.extend([float(random.randint(rmax + 1, sys.maxsize)) for _ in range(n % 2)] if max else [])
+    invalid_nums.extend([float(random.randint(-sys.maxsize - 1, rmin - 1)) for _ in range(n % 2)] if max else [])
+
+    for num in invalid_nums:
+        with pytest.raises(DataModelingError):
+            TestFloatRange(num).validate()
diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_generic_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_generic_types.py
new file mode 100644 (file)
index 0000000..ed77f4e
--- /dev/null
@@ -0,0 +1,26 @@
+from typing import Any, List, Union
+
+import pytest
+
+from knot_resolver.utils.modeling.types.base_generic_types import ListOrItem
+from knot_resolver.utils.modeling.types.inspect import get_base_generic_type_wrapper_argument
+
+
+@pytest.mark.parametrize("typ", [str, int, float, bool])
+def test_list_or_item_inner_type(typ: Any):
+    assert get_base_generic_type_wrapper_argument(ListOrItem[typ]) == Union[List[typ], typ]
+
+
+@pytest.mark.parametrize(
+    "value",
+    [
+        [],
+        65_535,
+        [1, 65_535, 5335, 5000],
+    ],
+)
+def test_list_or_item(value: Any):
+    obj = ListOrItem(value)
+    assert str(obj) == str(value)
+    for i, item in enumerate(obj):
+        assert item == value[i] if isinstance(value, list) else value
diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_integer_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_integer_types.py
new file mode 100644 (file)
index 0000000..9e88041
--- /dev/null
@@ -0,0 +1,75 @@
+import random
+import sys
+from typing import Any, Optional
+
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataModelingError
+from knot_resolver.utils.modeling.types.base_integer_types import BaseInteger, BaseIntegerRange
+
+
+@pytest.mark.parametrize("value", [-65535, -1, 0, 1, 65535])
+def test_base_integer(value: int):
+    obj = BaseInteger(value)
+    obj.validate()
+    assert int(obj) == value
+    assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("value", [True, False, "1", 1.1])
+def test_base_integer_invalid(value: Any):
+    with pytest.raises(DataModelingError):
+        BaseInteger(value).validate()
+
+
+@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)])
+def test_base_integer_range(min: Optional[int], max: Optional[int]):
+    class TestIntegerRange(BaseIntegerRange):
+        if min:
+            _min = min
+        if max:
+            _max = max
+
+    if min:
+        obj = TestIntegerRange(min)
+        obj.validate()
+        assert int(obj) == min
+        assert str(obj) == f"{min}"
+    if max:
+        obj = TestIntegerRange(max)
+        obj.validate()
+        assert int(obj) == max
+        assert str(obj) == f"{max}"
+
+    rmin = min if min else -sys.maxsize - 1
+    rmax = max if max else sys.maxsize
+
+    n = 100
+    values = [random.randint(rmin, rmax) for _ in range(n)]
+
+    for value in values:
+        obj = TestIntegerRange(value)
+        obj.validate()
+        assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)])
+def test_base_integer_range_invalid(min: Optional[int], max: Optional[int]):
+    class TestIntegerRange(BaseIntegerRange):
+        if min:
+            _min = min
+        if max:
+            _max = max
+
+    n = 100
+    invalid_nums = []
+
+    rmin = min if min else -sys.maxsize - 1
+    rmax = max if max else sys.maxsize
+
+    invalid_nums.extend([random.randint(rmax + 1, sys.maxsize) for _ in range(n % 2)] if max else [])
+    invalid_nums.extend([random.randint(-sys.maxsize - 1, rmin - 1) for _ in range(n % 2)] if max else [])
+
+    for num in invalid_nums:
+        with pytest.raises(DataModelingError):
+            TestIntegerRange(num).validate()
diff --git a/tests/python/knot_resolver/utils/modeling/types/test_base_string_types.py b/tests/python/knot_resolver/utils/modeling/types/test_base_string_types.py
new file mode 100644 (file)
index 0000000..e7da68f
--- /dev/null
@@ -0,0 +1,109 @@
+import random
+import string
+from typing import Any, Optional
+
+import pytest
+
+from knot_resolver.utils.modeling.errors import DataModelingError
+from knot_resolver.utils.modeling.types.base_string_types import BaseString, BaseStringLength, BaseUnit
+
+
+@pytest.mark.parametrize("value", [-65_535, -1, 0, 1, 65_535, "a", "abcdef"])
+def test_base_string(value: str):
+    obj = BaseString(value)
+    obj.validate()
+    assert str(obj) == str(value)
+
+
+@pytest.mark.parametrize("value", [True, False])
+def test_base_string_invalid(value: Any):
+    with pytest.raises(DataModelingError):
+        BaseString(value).validate()
+
+
+@pytest.mark.parametrize("min,max", [(None, 100), (10, 20), (50, None)])
+def test_base_string_length(min: Optional[int], max: Optional[int]):
+    class TestStringLength(BaseStringLength):
+        if min:
+            _min_bytes = min
+        if max:
+            _max_bytes = max
+
+    if min:
+        rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=min))
+        obj = TestStringLength(rand_str)
+        obj.validate()
+        assert str(obj) == f"{rand_str}"
+    if max:
+        rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=max))
+        obj = TestStringLength(rand_str)
+        obj.validate()
+        assert str(obj) == f"{rand_str}"
+
+    rmin = min if min else 1
+    rmax = max if max else 200
+
+    n = 100
+    values = [
+        "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(rmin, rmax))) for _ in range(n)
+    ]
+
+    for value in values:
+        obj = TestStringLength(value)
+        obj.validate()
+        assert str(obj) == f"{value}"
+
+
+@pytest.mark.parametrize("min,max", [(None, 100), (10, 20), (50, None)])
+def test_base_string_length_invalid(min: Optional[int], max: Optional[int]):
+    class TestStringLength(BaseStringLength):
+        if min:
+            _min_bytes = min
+        if max:
+            _max_bytes = max
+
+    n = 100
+    invalid_strings = []
+
+    rmin = min if min else 1
+    rmax = max if max else 200
+
+    invalid_strings.extend(
+        [
+            "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(rmax, rmax + 20)))
+            for _ in range(n % 2)
+        ]
+        if max
+        else []
+    )
+    invalid_strings.extend(
+        [
+            "".join(random.choices(string.ascii_uppercase + string.digits, k=random.randint(1, rmin)))
+            for _ in range(n % 2)
+        ]
+        if max
+        else []
+    )
+
+    for invalid_string in invalid_strings:
+        with pytest.raises(DataModelingError):
+            TestStringLength(invalid_string).validate()
+
+
+@pytest.mark.parametrize("value", [1000, "1000a", "100b", "10c", "1d"])
+def test_base_unit(value: str):
+    class TestBaseUnit(BaseUnit):
+        _units = {"a": 1, "b": 10, "c": 100, "d": 1000}
+
+    obj = TestBaseUnit(value)
+    obj.validate()
+    assert int(obj) == 1000
+
+
+@pytest.mark.parametrize("value", [True, False, "1000aa", "10ab", "1e"])
+def test_base_unit_invalid(value: Any):
+    class TestBaseUnit(BaseUnit):
+        _units = {"a": 1, "b": 10, "c": 100, "d": 1000}
+
+    with pytest.raises(DataModelingError):
+        TestBaseUnit(value).validate()
diff --git a/tests/python/knot_resolver/utils/modeling/types/test_inspect.py b/tests/python/knot_resolver/utils/modeling/types/test_inspect.py
new file mode 100644 (file)
index 0000000..856305e
--- /dev/null
@@ -0,0 +1,82 @@
+import sys
+from typing import Any, Dict, List, Literal, Optional, Tuple, Union
+
+import pytest
+
+from knot_resolver.utils.modeling import ModelNode
+from knot_resolver.utils.modeling.types.base_types import BaseType
+from knot_resolver.utils.modeling.types.inspect import (
+    NoneType,
+    is_dict,
+    is_list,
+    is_literal,
+    is_none_type,
+    is_optional,
+    is_tuple,
+    is_union,
+)
+
+types = [
+    Any,
+    bool,
+    int,
+    float,
+    str,
+    BaseType,
+    ModelNode,
+]
+
+literals = [
+    Literal[Any],
+    Literal[True, False],
+    Literal[0],
+    Literal[1, 2, 3],
+    Literal["literal"],
+    Literal["literal1", "literal2", "literal3"],
+]
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_dict(typ: Any) -> None:
+    assert is_dict(Dict[typ, Any])
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_list(typ: Any) -> None:
+    assert is_list(List[typ])
+    if sys.version_info >= (3, 9):
+        assert is_list(list[typ])
+
+
+@pytest.mark.parametrize("typ", literals)
+def test_is_literal(typ: Any) -> None:
+    assert is_literal(typ)
+
+
+@pytest.mark.parametrize("typ", [None, NoneType])
+def test_is_none_type(typ: Any) -> None:
+    assert is_none_type(typ)
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_optional(typ: Any) -> None:
+    assert is_optional(Optional[typ])
+    if sys.version_info >= (3, 9):
+        assert is_optional(typ | None)
+        assert is_optional(None | typ)
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_tuple(typ: Any) -> None:
+    assert is_tuple(Tuple[typ])
+    if sys.version_info >= (3, 9):
+        assert is_tuple(tuple[typ])
+
+
+@pytest.mark.parametrize("typ", types)
+def test_is_union(typ: Any) -> None:
+    assert is_union(Optional[typ])
+    assert is_union(Union[typ, None])
+    if sys.version_info >= (3, 9):
+        assert is_union(typ | None)
+        assert is_union(None | typ)