from knot_resolver_manager.datamodel.network_config import Network, NetworkStrict
from knot_resolver_manager.datamodel.options_config import Options, OptionsStrict
from knot_resolver_manager.datamodel.server_config import Server, ServerStrict
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
def _import_lua_template() -> Template:
_LUA_TEMPLATE = _import_lua_template()
-class KresConfig(DataParser):
+class KresConfig(SchemaNode):
server: Server = Server()
options: Options = Options()
network: Network = Network()
lua: Lua = Lua()
-class KresConfigStrict(DataValidator):
+class KresConfigStrict(SchemaNode):
server: ServerStrict
options: OptionsStrict
network: NetworkStrict
from knot_resolver_manager.datamodel.types import IPv6Network96
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
-class Dns64(DataParser):
+class Dns64(SchemaNode):
prefix: IPv6Network96 = IPv6Network96("64:ff9b::/96")
-class Dns64Strict(DataValidator):
+class Dns64Strict(SchemaNode):
prefix: IPv6Network96
from typing import List, Optional
from knot_resolver_manager.datamodel.types import TimeUnit
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
-class TrustAnchorFile(DataParser):
+class TrustAnchorFile(SchemaNode):
file: str
read_only: bool = False
-class Dnssec(DataParser):
+class Dnssec(SchemaNode):
trust_anchor_sentinel: bool = True
trust_anchor_signal_query: bool = True
time_skew_detection: bool = True
trust_anchors_files: Optional[List[TrustAnchorFile]] = None
-class TrustAnchorFileStrict(DataValidator):
+class TrustAnchorFileStrict(SchemaNode):
file: str
read_only: bool
-class DnssecStrict(DataValidator):
+class DnssecStrict(SchemaNode):
trust_anchor_sentinel: bool
trust_anchor_signal_query: bool
time_skew_detection: bool
from typing import Optional
from knot_resolver_manager.exceptions import ValidationException
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
-class Lua(DataParser):
+class Lua(SchemaNode):
script_only: bool = False
script: Optional[str] = None
script_file: Optional[str] = None
-class LuaStrict(DataValidator):
+class LuaStrict(SchemaNode):
script_only: bool
script: Optional[str]
script_file: Optional[str]
from typing import List
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
KindEnum = LiteralEnum["dns", "xdp", "dot", "doh"]
-class Interface(DataParser):
+class Interface(SchemaNode):
listen: str
kind: KindEnum = "dns"
freebind: bool = False
-class InterfaceStrict(DataValidator):
+class InterfaceStrict(SchemaNode):
address: str
port: int
kind: str
return port_map.get(obj.kind, 0)
-class Network(DataParser):
+class Network(SchemaNode):
interfaces: List[Interface] = [Interface({"listen": "127.0.0.1"}), Interface({"listen": "::1", "freebind": True})]
-class NetworkStrict(DataValidator):
+class NetworkStrict(SchemaNode):
interfaces: List[InterfaceStrict]
from typing import Union
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
from .types import TimeUnit
GlueCheckingEnum = LiteralEnum["normal", "strict", "permissive"]
-class Prediction(DataParser):
+class Prediction(SchemaNode):
window: TimeUnit = TimeUnit("15m")
period: int = 24
-class Options(DataParser):
+class Options(SchemaNode):
glue_checking: GlueCheckingEnum = "normal"
qname_minimisation: bool = True
query_loopback: bool = False
prediction: Union[bool, Prediction] = False
-class PredictionStrict(DataValidator):
+class PredictionStrict(SchemaNode):
window: TimeUnit
period: int
-class OptionsStrict(DataValidator):
+class OptionsStrict(SchemaNode):
glue_checking: GlueCheckingEnum
qname_minimisation: bool
query_loopback: bool
from knot_resolver_manager.datamodel.types import AnyPath, Listen, ListenStrict
from knot_resolver_manager.exceptions import ValidationException
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
logger = logging.getLogger(__name__)
BackendEnum = LiteralEnum["auto", "systemd", "supervisord"]
-class Management(DataParser):
+class Management(SchemaNode):
listen: Listen = Listen({"unix-socket": "/tmp/manager.sock"})
backend: BackendEnum = "auto"
rundir: AnyPath = AnyPath(".")
-class ManagementStrict(DataValidator):
+class ManagementStrict(SchemaNode):
listen: ListenStrict
backend: BackendEnum
rundir: AnyPath
-class Webmgmt(DataParser):
+class Webmgmt(SchemaNode):
listen: Listen
tls: bool = False
cert_file: Optional[AnyPath] = None
key_file: Optional[AnyPath] = None
-class WebmgmtStrict(DataValidator):
+class WebmgmtStrict(SchemaNode):
listen: ListenStrict
tls: bool
cert_file: Optional[AnyPath]
key_file: Optional[AnyPath]
-class Server(DataParser):
+class Server(SchemaNode):
hostname: Optional[str] = None
groupid: Optional[str] = None
nsid: Optional[str]
webmgmt: Optional[Webmgmt] = None
-class ServerStrict(DataValidator):
+class ServerStrict(SchemaNode):
hostname: str
groupid: Optional[str]
nsid: Optional[str]
from knot_resolver_manager.exceptions import DataValidationException
from knot_resolver_manager.utils import CustomValueType
-from knot_resolver_manager.utils.data_parser_validator import DataParser, DataValidator
+from knot_resolver_manager.utils.data_parser_validator import SchemaNode
logger = logging.getLogger(__name__)
return str(self._value)
-class Listen(DataParser):
+class Listen(SchemaNode):
ip: Optional[str] = None
port: Optional[int] = None
unix_socket: Optional[AnyPath] = None
INTERFACE_AND_PORT = auto()
-class ListenStrict(DataValidator):
+class ListenStrict(SchemaNode):
typ: ListenType
ip: Optional[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]] = None
port: Optional[int] = None
from typing import Any, Callable, Iterable, Optional, Type, TypeVar
from .custom_types import CustomValueType
-from .data_parser_validator import DataParser, DataValidator, Format
+from .data_parser_validator import Format, SchemaNode
T = TypeVar("T")
__all__ = [
"Format",
"CustomValueType",
- "DataParser",
- "DataValidator",
+ "SchemaNode",
]
import json
import re
from enum import Enum, auto
-from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
+from typing import Any, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union
import yaml
from yaml.constructor import ConstructorError
)
from knot_resolver_manager.utils.custom_types import CustomValueType
from knot_resolver_manager.utils.types import (
+ NoneType,
get_attr_type,
get_generic_type_argument,
get_generic_type_arguments,
is_list,
is_literal,
is_none_type,
+ is_optional,
is_tuple,
is_union,
)
return obj.serialize()
# nested DataParser class instances
- elif isinstance(obj, DataParser):
+ elif isinstance(obj, SchemaNode):
return obj.to_dict()
# otherwise just return, what we were given
# no validation performed, the implementation does it in the constuctor
return cls(obj, object_path=object_path)
- # nested DataParser subclasses
- elif inspect.isclass(cls) and issubclass(cls, DataParser):
+ # nested SchemaNode subclasses
+ elif inspect.isclass(cls) and issubclass(cls, SchemaNode):
# we should return DataParser, we expect to be given a dict,
# because we can construct a DataParser from it
- if isinstance(obj, dict):
+ if isinstance(obj, (dict, SchemaNode)):
return cls(obj, object_path=object_path) # type: ignore
- raise DataParsingException(f"Expected '{dict}' object, found '{type(obj)}'", object_path)
-
- # nested DataValidator subclasses
- elif inspect.isclass(cls) and issubclass(cls, DataValidator):
- # we should return DataValidator, we expect to be given a DataParser,
- # because we can construct a DataValidator from it
- if isinstance(obj, DataParser):
- return cls(obj, object_path=object_path)
- raise DataParsingException(f"Expected instance of '{DataParser}' class, found '{type(obj)}'", object_path)
+ raise DataParsingException(f"Expected 'dict' or 'SchemaNode' object, found '{type(obj)}'", object_path)
# if the object matches, just pass it through
elif inspect.isclass(cls) and isinstance(obj, cls):
return formats[mime_type]
-_T = TypeVar("_T", bound="DataParser")
+_T = TypeVar("_T", bound="SchemaNode")
_SUBTREE_MUTATION_PATH_PATTERN = re.compile(r"^(/[^/]+)*/?$")
-class DataParser:
- def __init__(self, obj: Optional[Dict[Any, Any]] = None, object_path: str = "/"):
+TSource = Union[NoneType, Dict[Any, Any], "SchemaNode"]
+
+
+class SchemaNode:
+ def __init__(self, source: TSource = None, object_path: str = "/"):
cls = self.__class__
annot = cls.__dict__.get("__annotations__", {})
- used_keys: List[str] = []
+ used_keys: Set[str] = set()
for name, python_type in annot.items():
if is_internal_field(name):
continue
- val = None
- dash_name = name.replace("_", "-")
- if obj and dash_name in obj:
- val = obj[dash_name]
- used_keys.append(dash_name)
+ # convert naming (used when converting from json/yaml)
+ source_name = name.replace("_", "-") if isinstance(source, dict) else name
+
+ # populate field
+ if not source:
+ val = None
+ # we have a way how to create the value
+ elif hasattr(self, f"_{name}"):
+ val = self._get_converted_value(name, source, object_path)
+ used_keys.add(source_name) # the field might not exist, but that won't break anything
+ # source just contains the value
+ elif source_name in source:
+ val = source[source_name]
+ used_keys.add(source_name)
+ # there is a default value and in the source, the value is missing
+ elif getattr(self, name, ...) is not ...:
+ val = None
+ # the value is optional and there is nothing
+ elif is_optional(python_type):
+ val = None
+ # we expected a value but it was not there
+ else:
+ raise DataValidationException(f"Missing attribute '{source_name}'.", object_path)
use_default = hasattr(cls, name)
default = getattr(cls, name, ...)
value = _validated_object_type(python_type, val, default, use_default, object_path=f"{object_path}/{name}")
setattr(self, name, value)
- # check for unused keys
- if obj:
- for key in obj:
- if key not in used_keys:
- additional_info = ""
- if "_" in key:
- additional_info = (
- " The problem might be that you are using '_', but you should be using '-' instead."
- )
- raise DataParsingException(
- f"Attribute '{key}' was not provided with any value." + additional_info, object_path
- )
+ # check for unused keys in case the
+ if source and isinstance(source, dict):
+ unused = source.keys() - used_keys
+ if len(unused) > 0:
+ raise DataParsingException(
+ f"Keys {unused} in your configuration object are not part of the configuration schema."
+ " Are you using '-' instead of '_'?",
+ object_path,
+ )
+
+ # validate the constructed value
+ self._validate()
+
+ def _get_converted_value(self, key: str, source: TSource, object_path: str) -> Any:
+ try:
+ return getattr(self, f"_{key}")(source)
+ except (ValueError, ValidationException) as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = "Failed to validate value type"
+ raise DataValidationException(msg, object_path) from e
+
+ def __getitem__(self, key: str) -> Any:
+ if not hasattr(self, key):
+ raise RuntimeError(f"Object '{self}' of type '{type(self)}' does not have field named '{key}'")
+ return getattr(self, key)
+
+ def __contains__(self, item: Any) -> bool:
+ return hasattr(self, item)
+
+ def validate(self) -> None:
+ for field_name in dir(self):
+ if is_internal_field(field_name):
+ continue
+
+ field = getattr(self, field_name)
+ if isinstance(field, SchemaNode):
+ field.validate()
+ self._validate()
+
+ def _validate(self) -> None:
+ pass
@classmethod
def parse_from(cls: Type[_T], fmt: Format, text: str):
setattr(parent, last_name, parsed_value)
return to_mutate
-
-
-class DataValidator:
- def __init__(self, obj: DataParser, object_path: str = ""):
- cls = self.__class__
- anot = cls.__dict__.get("__annotations__", {})
-
- for attr_name, attr_type in anot.items():
- if is_internal_field(attr_name):
- continue
-
- # use transformation function if available
- if hasattr(self, f"_{attr_name}"):
- try:
- value = getattr(self, f"_{attr_name}")(obj)
- except (ValueError, ValidationException) as e:
- if len(e.args) > 0 and isinstance(e.args[0], str):
- msg = e.args[0]
- else:
- msg = "Failed to validate value type"
- raise DataValidationException(msg, object_path) from e
- elif hasattr(obj, attr_name):
- value = getattr(obj, attr_name)
- else:
- raise DataValidationException(
- f"DataParser object {obj} is missing '{attr_name}' attribute.", object_path
- )
-
- setattr(self, attr_name, _validated_object_type(attr_type, value))
-
- self._validate()
-
- def validate(self) -> None:
- for field_name in dir(self):
- if is_internal_field(field_name):
- continue
-
- field = getattr(self, field_name)
- if isinstance(field, DataValidator):
- field.validate()
- self._validate()
-
- def _validate(self) -> None:
- pass
TimeUnit,
)
from knot_resolver_manager.exceptions import KresdManagerException
-from knot_resolver_manager.utils import DataParser, DataValidator
+from knot_resolver_manager.utils import SchemaNode
def test_size_unit():
def test_parsing_units():
- class TestClass(DataParser):
+ class TestClass(SchemaNode):
size: SizeUnit
time: TimeUnit
- class TestClassStrict(DataValidator):
+ class TestClassStrict(SchemaNode):
size: int
time: int
def test_anypath():
- class Data(DataParser):
+ class Data(SchemaNode):
p: AnyPath
assert str(Data.from_yaml('p: "/tmp"').p) == "/tmp"
from typing_extensions import Literal
from knot_resolver_manager.exceptions import DataParsingException
-from knot_resolver_manager.utils import DataParser, DataValidator, Format
+from knot_resolver_manager.utils import Format, SchemaNode
def test_primitive():
- class TestClass(DataParser):
+ class TestClass(SchemaNode):
i: int
s: str
b: bool
- class TestClassStrict(DataValidator):
+ class TestClassStrict(SchemaNode):
i: int
s: str
b: bool
def test_parsing_primitive_exceptions():
- class TestStr(DataParser):
+ class TestStr(SchemaNode):
s: str
# int and float are allowed inputs for string
with raises(DataParsingException):
TestStr.from_yaml("s: false") # bool
- class TestInt(DataParser):
+ class TestInt(SchemaNode):
i: int
with raises(DataParsingException):
with raises(DataParsingException):
TestInt.from_yaml("i: 5.5") # float
- class TestBool(DataParser):
+ class TestBool(SchemaNode):
b: bool
with raises(DataParsingException):
def test_nested():
- class Lower(DataParser):
+ class Lower(SchemaNode):
i: int
- class Upper(DataParser):
+ class Upper(SchemaNode):
l: Lower
- class LowerStrict(DataValidator):
+ class LowerStrict(SchemaNode):
i: int
def _validate(self) -> None:
pass
- class UpperStrict(DataValidator):
+ class UpperStrict(SchemaNode):
l: LowerStrict
def _validate(self) -> None:
def test_simple_compount_types():
- class TestClass(DataParser):
+ class TestClass(SchemaNode):
l: List[int]
d: Dict[str, str]
t: Tuple[str, int]
o: Optional[int]
- class TestClassStrict(DataValidator):
+ class TestClassStrict(SchemaNode):
l: List[int]
d: Dict[str, str]
t: Tuple[str, int]
def test_nested_compound_types():
- class TestClass(DataParser):
+ class TestClass(SchemaNode):
o: Optional[Dict[str, str]]
- class TestClassStrict(DataValidator):
+ class TestClassStrict(SchemaNode):
o: Optional[Dict[str, str]]
def _validate(self) -> None:
def test_nested_compount_types2():
- class TestClass(DataParser):
+ class TestClass(SchemaNode):
i: int
o: Optional[Dict[str, str]]
- class TestClassStrict(DataValidator):
+ class TestClassStrict(SchemaNode):
i: int
o: Optional[Dict[str, str]]
def test_partial_mutations():
- class Inner(DataParser):
+ class Inner(SchemaNode):
size: int = 5
- class ConfData(DataParser):
+ class ConfData(SchemaNode):
workers: Union[Literal["auto"], int] = 1
lua_config: Optional[str] = None
inner: Inner = Inner()
- class InnerStrict(DataValidator):
+ class InnerStrict(SchemaNode):
size: int
def _validate(self) -> None:
pass
- class ConfDataStrict(DataValidator):
+ class ConfDataStrict(SchemaNode):
workers: int
lua_config: Optional[str]
inner: InnerStrict