from typing import List, Optional
from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, SizeUnit, TimeUnit
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class PrefillSchema(SchemaNode):
+class PrefillSchema(BaseSchema):
"""
Prefill the cache periodically by importing zone data obtained over HTTP.
raise ValueError("cache prefilling is not yet supported for non-root zones")
-class CacheSchema(SchemaNode):
+class CacheSchema(BaseSchema):
"""
DNS resolver cache configuration.
from knot_resolver_manager.datamodel.types.types import IntPositive, UncheckedPath
from knot_resolver_manager.datamodel.view_schema import ViewSchema
from knot_resolver_manager.datamodel.webmgmt_schema import WebmgmtSchema
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
logger = logging.getLogger(__name__)
return cpus
-class KresConfig(SchemaNode):
- class Raw(SchemaNode):
+class KresConfig(BaseSchema):
+ class Raw(BaseSchema):
"""
Knot Resolver declarative configuration.
from knot_resolver_manager.datamodel.types import IPv6Network96
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class Dns64Schema(SchemaNode):
+class Dns64Schema(BaseSchema):
"""
DNS64 (RFC 6147) configuration.
from typing import List, Optional
from knot_resolver_manager.datamodel.types import IntNonNegative, TimeUnit
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class TrustAnchorFileSchema(SchemaNode):
+class TrustAnchorFileSchema(BaseSchema):
"""
Trust-anchor zonefile configuration.
read_only: bool = False
-class DnssecSchema(SchemaNode):
+class DnssecSchema(BaseSchema):
"""
DNSSEC configuration.
from knot_resolver_manager.datamodel.policy_schema import ForwardServerSchema
from knot_resolver_manager.datamodel.types import DomainName, IPAddressOptionalPort, PolicyFlagEnum
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class ForwardZoneSchema(SchemaNode):
+class ForwardZoneSchema(BaseSchema):
"""
Configuration of Forward Zone.
from typing_extensions import Literal, TypeAlias
from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
LogLevelEnum = Literal["crit", "err", "warning", "notice", "info", "debug"]
LogTargetEnum = Literal["syslog", "stderr", "stdout"]
]
-class DnstapSchema(SchemaNode):
+class DnstapSchema(BaseSchema):
"""
Logging DNS queries and responses to a unix socket.
log_tcp_rtt: bool = True
-class DebuggingSchema(SchemaNode):
+class DebuggingSchema(BaseSchema):
"""
Advanced debugging parameters for kresd (Knot Resolver daemon).
assertion_fork: TimeUnit = TimeUnit("5m")
-class LoggingSchema(SchemaNode):
+class LoggingSchema(BaseSchema):
"""
Logging and debugging configuration.
from typing import Optional
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class LuaSchema(SchemaNode):
+class LuaSchema(BaseSchema):
"""
Custom Lua configuration.
from typing import Optional
from knot_resolver_manager.datamodel.types import CheckedPath, IPAddressPort
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class ManagementSchema(SchemaNode):
+class ManagementSchema(BaseSchema):
"""
Configuration of management HTTP API.
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import DomainName, IPAddress, PortNumber, TimeUnit
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class GraphiteSchema(SchemaNode):
+class GraphiteSchema(BaseSchema):
host: Union[IPAddress, DomainName]
port: PortNumber = PortNumber(2003)
prefix: str = ""
tcp: bool = False
-class MonitoringSchema(SchemaNode):
+class MonitoringSchema(BaseSchema):
"""
---
enabled: configures, whether statistics module will be loaded into resolver
PortNumber,
SizeUnit,
)
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
KindEnum = Literal["dns", "xdp", "dot", "doh-legacy", "doh2"]
-class EdnsBufferSizeSchema(SchemaNode):
+class EdnsBufferSizeSchema(BaseSchema):
"""
EDNS payload size advertised in DNS packets.
downstream: SizeUnit = SizeUnit("1232B")
-class AddressRenumberingSchema(SchemaNode):
+class AddressRenumberingSchema(BaseSchema):
"""
Renumbers addresses in answers to different address space.
destination: IPAddress
-class TLSSchema(SchemaNode):
+class TLSSchema(BaseSchema):
"""
TLS configuration, also affects DNS over TLS and DNS over HTTPS.
raise ValueError("'sticket_secret' and 'sticket_secret_file' are both defined, only one can be used")
-class ListenSchema(SchemaNode):
- class Raw(SchemaNode):
+class ListenSchema(BaseSchema):
+ class Raw(BaseSchema):
"""
Configuration of listening interface.
)
-class ProxyProtocolSchema(SchemaNode):
+class ProxyProtocolSchema(BaseSchema):
"""
PROXYv2 protocol configuration.
allow: List[Union[IPAddress, IPNetwork]]
-class NetworkSchema(SchemaNode):
+class NetworkSchema(BaseSchema):
"""
Network connections and protocols configuration.
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import IntNonNegative, TimeUnit
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
GlueCheckingEnum = Literal["normal", "strict", "permissive"]
-class PredictionSchema(SchemaNode):
+class PredictionSchema(BaseSchema):
"""
Helps keep the cache hot by prefetching expiring records and learning usage patterns and repetitive queries.
period: IntNonNegative = IntNonNegative(24)
-class OptionsSchema(SchemaNode):
- class Raw(SchemaNode):
+class OptionsSchema(BaseSchema):
+ class Raw(BaseSchema):
"""
Fine-tuning global parameters of DNS resolver operation.
PolicyFlagEnum,
TimeUnit,
)
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class FilterSchema(SchemaNode):
+class FilterSchema(BaseSchema):
"""
Query filtering configuration.
qtype: Optional[DNSRecordTypeEnum] = None
-class AnswerSchema(SchemaNode):
+class AnswerSchema(BaseSchema):
"""
Configuration of custom resource record for DNS answer.
nodata: bool = False
-class ForwardServerSchema(SchemaNode):
+class ForwardServerSchema(BaseSchema):
"""
Configuration of Forward server.
)
-class ActionSchema(SchemaNode):
+class ActionSchema(BaseSchema):
"""
Configuration of policy action.
_validate_policy_action(self)
-class PolicySchema(SchemaNode):
+class PolicySchema(BaseSchema):
"""
Configuration of policy rule.
from typing import List, Optional
from knot_resolver_manager.datamodel.types import CheckedPath, PolicyActionEnum, PolicyFlagEnum
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class RPZSchema(SchemaNode):
+class RPZSchema(BaseSchema):
"""
Configuration or Response Policy Zone (RPZ).
from typing_extensions import Literal
from knot_resolver_manager.datamodel.policy_schema import ActionSchema
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class SliceSchema(SchemaNode):
+class SliceSchema(BaseSchema):
"""
Split the entire DNS namespace into distinct slices.
from typing import Dict, List, Optional
from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddress, TimeUnit
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class StaticHintsSchema(SchemaNode):
+class StaticHintsSchema(BaseSchema):
"""
Static hints for forward records (A/AAAA) and reverse records (PTR)
from typing import List, Optional, Union
from knot_resolver_manager.datamodel.types import DomainName, IPAddressOptionalPort, PolicyFlagEnum
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class StubServerSchema(SchemaNode):
+class StubServerSchema(BaseSchema):
"""
Configuration of Stub server.
address: IPAddressOptionalPort
-class StubZoneSchema(SchemaNode):
+class StubZoneSchema(BaseSchema):
"""
Configuration of Stub Zone.
import re
from typing import Any, Dict, Pattern, Type
-from knot_resolver_manager.utils.modeling import CustomValueType
+from knot_resolver_manager.utils.modeling import BaseCustomType
from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
-class IntBase(CustomValueType):
+class IntBase(BaseCustomType):
"""
Base class to work with integer value.
"""
return {"type": "integer"}
-class StrBase(CustomValueType):
+class StrBase(BaseCustomType):
"""
Base class to work with string value.
"""
from typing import Any, Dict, Optional, Type, Union
from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase
-from knot_resolver_manager.utils.modeling import CustomValueType
+from knot_resolver_manager.utils.modeling import BaseCustomType
from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
)
-class IPv4Address(CustomValueType):
+class IPv4Address(BaseCustomType):
_value: ipaddress.IPv4Address
def __init__(self, source_value: Any, object_path: str = "/") -> None:
}
-class IPv6Address(CustomValueType):
+class IPv6Address(BaseCustomType):
_value: ipaddress.IPv6Address
def __init__(self, source_value: Any, object_path: str = "/") -> None:
IPAddress = Union[IPv4Address, IPv6Address]
-class IPNetwork(CustomValueType):
+class IPNetwork(BaseCustomType):
_value: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
def __init__(self, source_value: Any, object_path: str = "/") -> None:
}
-class IPv6Network96(CustomValueType):
+class IPv6Network96(BaseCustomType):
_value: ipaddress.IPv6Network
def __init__(self, source_value: Any, object_path: str = "/") -> None:
return {"type": "string"}
-class UncheckedPath(CustomValueType):
+class UncheckedPath(BaseCustomType):
"""
Wrapper around pathlib.Path object. Can represent pretty much any Path. No checks are
performed on the value. The value is taken as is.
from typing import List, Optional
from knot_resolver_manager.datamodel.types import IPNetwork, PolicyFlagEnum
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class ViewSchema(SchemaNode):
+class ViewSchema(BaseSchema):
"""
Configuration parameters that allow you to create personalized policy rules and other.
from typing import Optional
from knot_resolver_manager.datamodel.types import CheckedPath, InterfacePort
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
-class WebmgmtSchema(SchemaNode):
+class WebmgmtSchema(BaseSchema):
"""
Configuration of legacy web management endpoint.
## Creating schema
-Schema is created using `SchemaNode` class. Schema structure is specified using annotations.
+Schema is created using `BaseSchema` class. Schema structure is specified using annotations.
```python
-from .modeling import SchemaNode
+from .modeling import BaseSchema
-class SimpleSchema(SchemaNode):
+class SimpleSchema(BaseSchema):
integer: int = 5 # a default value can be specified
string: str
boolean: bool
```python
from typing import Dict, List, Optional, Union
-class ComplexSchema(SchemaNode):
+class ComplexSchema(BaseSchema):
optional: Optional[str] # this field is optional
union: Union[int, str] # integer and string are both valid
list: List[int] # list of integers
`ValueError` exception should be raised in case of validation error.
```python
-class FieldsSchema(SchemaNode):
+class FieldsSchema(BaseSchema):
field1: int
field2: int
In this example, the `Layer2Schema` is structure for input data and `Layer1Schema` is for result data.
```python
-class Layer1Schema(SchemaNode):
- class Layer2Schema(SchemaNode):
+class Layer1Schema(BaseSchema):
+ class Layer2Schema(BaseSchema):
value: Union[str, int]
_LAYER = Layer2Schema
Created schema can be documented using simple docstring. Json schema is created by calling `json_schema()` method on schema class. JSON schema includes description from docstring, defaults, etc.
```python
-SimpleSchema(SchemaNode):
+SimpleSchema(BaseSchema):
"""
This is description for SimpleSchema itself.
## Creating custom type
-Custom types can be made by extending `CustomValueType` class which is integrated to parsing and validating process.
+Custom types can be made by extending `BaseCustomType` class which is integrated to parsing and validating process.
Use `DataValidationError` to rase exception during validation. `object_path` is used to track node in more complex/nested schemas and create useful logging message.
```python
-from .modeling import CustomValueType
+from .modeling import BaseCustomType
from .modeling.exceptions import DataValidationError
-class IntNonNegative(CustomValueType):
+class IntNonNegative(BaseCustomType):
def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
if isinstance(source_value, int) and not isinstance(source_value, bool):
-from .custom_value_type import CustomValueType
-from .parsed_tree import ParsedTree, parse, parse_json, parse_yaml
-from .schema_node import SchemaNode
+from .base_custom_type import BaseCustomType
+from .base_schema import BaseSchema
+from .parsing import ParsedTree, parse, parse_json, parse_yaml
__all__ = [
- "CustomValueType",
- "SchemaNode",
+ "BaseCustomType",
+ "BaseSchema",
"ParsedTree",
"parse",
"parse_yaml",
from typing import Any, Dict, Type
-class CustomValueType:
+class BaseCustomType:
"""
Subclasses of this class can be used as type annotations in 'DataParser'. When a value
is being parsed from a serialized format (e.g. JSON/YAML), an object will be created by
Example:
```
class A(DataParser):
- field: MyCustomValueType
+ field: MyBaseCustomType
- A.from_json('{"field": "value"}') == A(field=MyCustomValueType("value"))
+ A.from_json('{"field": "value"}') == A(field=MyBaseCustomType("value"))
```
There is no validation done on the wrapped value. The only condition is that
pass
def __int__(self) -> int:
- raise NotImplementedError("CustomValueType return 'int()' value is not implemented.")
+ raise NotImplementedError("BaseCustomType return 'int()' value is not implemented.")
def __str__(self) -> str:
- raise NotImplementedError("CustomValueType return 'str()' value is not implemented.")
+ raise NotImplementedError("BaseCustomType return 'str()' value is not implemented.")
def serialize(self) -> Any:
"""
raise NotImplementedError(f"{type(self).__name__}'s' 'to_dict()' not implemented.")
@classmethod
- def json_schema(cls: Type["CustomValueType"]) -> Dict[Any, Any]:
+ def json_schema(cls: Type["BaseCustomType"]) -> Dict[Any, Any]:
raise NotImplementedError()
from knot_resolver_manager.utils.functional import all_matches
-from .custom_value_type import CustomValueType
+from .base_custom_type import BaseCustomType
from .exceptions import AggregateDataValidationError, DataDescriptionError, DataValidationError
-from .parsed_tree import ParsedTree
+from .parsing import ParsedTree
from .types import (
NoneType,
get_generic_type_argument,
or is_dict(typ)
or is_list(typ)
or (inspect.isclass(typ) and issubclass(typ, Serializable))
- or (inspect.isclass(typ) and issubclass(typ, CustomValueType))
- or (inspect.isclass(typ) and issubclass(typ, SchemaNode))
+ or (inspect.isclass(typ) and issubclass(typ, BaseCustomType))
+ or (inspect.isclass(typ) and issubclass(typ, BaseSchema))
or (is_optional(typ) and Serializable.is_serializable(get_optional_inner_type(typ)))
or (is_union(typ) and all_matches(Serializable.is_serializable, get_generic_type_arguments(typ)))
)
if isinstance(obj, Serializable):
return obj.to_dict()
- elif isinstance(obj, CustomValueType):
+ elif isinstance(obj, BaseCustomType):
return obj.serialize()
elif isinstance(obj, list):
def _parse_attrs_docstrings(docstring: str) -> Optional[Dict[str, str]]:
"""
- Given a docstring of a SchemaNode, return a dict with descriptions of individual attributes.
+ Given a docstring of a BaseSchema, return a dict with descriptions of individual attributes.
"""
_, attrs_doc = _split_docstring(docstring)
def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
# pylint: disable=too-many-branches
- if inspect.isclass(typ) and issubclass(typ, SchemaNode):
+ if inspect.isclass(typ) and issubclass(typ, BaseSchema):
return typ.json_schema(include_schema_definition=False)
- elif inspect.isclass(typ) and issubclass(typ, CustomValueType):
+ elif inspect.isclass(typ) and issubclass(typ, BaseCustomType):
return typ.json_schema()
elif is_none_type(typ):
elif is_dict(typ):
key, val = get_generic_type_arguments(typ)
- if inspect.isclass(key) and issubclass(key, CustomValueType):
+ if inspect.isclass(key) and issubclass(key, BaseCustomType):
assert (
- key.__str__ is not CustomValueType.__str__
- ), "To support derived 'CustomValueType', __str__ must be implemented."
+ key.__str__ is not BaseCustomType.__str__
+ ), "To support derived 'BaseCustomType', __str__ must be implemented."
else:
assert key == str, "We currently do not support any other keys then strings"
# int
elif cls == int:
# we don't want to make an int out of anything else than other int
- # except for CustomValueType class instances
- if is_obj_type(obj, int) or isinstance(obj, CustomValueType):
+ # except for BaseCustomType class instances
+ if is_obj_type(obj, int) or isinstance(obj, BaseCustomType):
return int(obj)
raise DataValidationError(f"expected int, found {type(obj)}", object_path)
# str
elif cls == str:
# we are willing to cast any primitive value to string, but no compound values are allowed
- if is_obj_type(obj, (str, float, int)) or isinstance(obj, CustomValueType):
+ if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseCustomType):
return str(obj)
elif is_obj_type(obj, bool):
raise DataValidationError(
elif is_obj_type(obj, cls):
return obj
- # CustomValueType subclasses
- elif inspect.isclass(cls) and issubclass(cls, CustomValueType):
+ # BaseCustomType subclasses
+ elif inspect.isclass(cls) and issubclass(cls, BaseCustomType):
if isinstance(obj, cls):
# if we already have a custom value type, just pass it through
return obj
# no validation performed, the implementation does it in the constuctor
return cls(obj, object_path=object_path)
- # nested SchemaNode subclasses
- elif inspect.isclass(cls) and issubclass(cls, SchemaNode):
+ # nested BaseSchema subclasses
+ elif inspect.isclass(cls) and issubclass(cls, BaseSchema):
# we should return DataParser, we expect to be given a dict,
# because we can construct a DataParser from it
- if isinstance(obj, (dict, SchemaNode)):
+ if isinstance(obj, (dict, BaseSchema)):
return cls(obj, object_path=object_path) # type: ignore
- raise DataValidationError(f"expected 'dict' or 'SchemaNode' object, found '{type(obj)}'", object_path)
+ raise DataValidationError(f"expected 'dict' or 'BaseSchema' object, found '{type(obj)}'", object_path)
# if the object matches, just pass it through
elif inspect.isclass(cls) and isinstance(obj, cls):
)
-TSource = Union[NoneType, ParsedTree, "SchemaNode", Dict[str, Any]]
+TSource = Union[NoneType, ParsedTree, "BaseSchema", Dict[str, Any]]
def _create_untouchable(name: str) -> object:
return _Untouchable()
-class SchemaNode(Serializable):
+class BaseSchema(Serializable):
"""
Class for modelling configuration schema. It somewhat resembles standard dataclasses with additional
functionality:
* data conversion
To create an instance of this class, you have to provide source data in the form of dict-like object.
- Generally, we expect `ParsedTree`, raw dict or another `SchemaNode` instance. The provided data object
+ Generally, we expect `ParsedTree`, raw dict or another `BaseSchema` instance. The provided data object
is traversed, transformed and validated before assigned to the appropriate fields.
Fields (attributes)
The fields (or attributes) of the class are defined the same way as in a dataclass by creating a class-level
type-annotated fields. An example of that is:
- class A(SchemaNode):
+ class A(BaseSchema):
awesome_number: int
- If your `SchemaNode` instance has a field with type of a SchemaNode, its value is recursively created
- from the nested input data. This way, you can specify a complex tree of SchemaNode's and use the root
- SchemaNode to create instance of everything.
+ If your `BaseSchema` instance has a field with type of a BaseSchema, its value is recursively created
+ from the nested input data. This way, you can specify a complex tree of BaseSchema's and use the root
+ BaseSchema to create instance of everything.
Transformation
==============
- You can provide the SchemaNode class with a field and a function with the same name, but starting with
+ You can provide the BaseSchema class with a field and a function with the same name, but starting with
underscore ('_'). For example, you could have field called `awesome_number` and function called
`_awesome_number(self, source)`. The function takes one argument - the source data (optionally with self,
but you are not supposed to touch that). It can read any data from the source object and return a value of
Using this, you can convert any input values into any type and field you want. To make the conversion easier
to write, you could also specify a special class variable called `_LAYER` pointing to another
- SchemaNode class. This causes the source object to be first parsed as the specified SchemaNode and after that
+ BaseSchema class. This causes the source object to be first parsed as the specified BaseSchema and after that
used a source for this class. This therefore allows nesting of transformation functions.
Validation
See tests/utils/test_modelling.py for example usage.
"""
- _LAYER: Optional[Type["SchemaNode"]] = None
+ _LAYER: Optional[Type["BaseSchema"]] = None
def _assign_default(self, name: str, python_type: Any, object_path: str) -> None:
cls = self.__class__
value = _validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
setattr(self, name, value)
- def _assign_fields(self, source: Union[ParsedTree, "SchemaNode", NoneType], object_path: str) -> Set[str]:
+ def _assign_fields(self, source: Union[ParsedTree, "BaseSchema", NoneType], object_path: str) -> Set[str]:
"""
Order of assignment:
1. all direct assignments
source = ParsedTree(source)
# save source
- self._source: Union[ParsedTree, SchemaNode] = source
+ self._source: Union[ParsedTree, BaseSchema] = source
- # construct lower level schema node first if configured to do so
+ # construct lower level schema first if configured to do so
if self._LAYER is not None:
source = self._LAYER(source, object_path=object_path) # pylint: disable=not-callable
used_keys = self._assign_fields(source, object_path)
# check for unused keys in the source object
- if source and not isinstance(source, SchemaNode):
+ if source and not isinstance(source, BaseSchema):
unused = source.keys() - used_keys
if len(unused) > 0:
keys = ", ".join((f"'{u}'" for u in unused))
raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
def get_unparsed_data(self) -> ParsedTree:
- if isinstance(self._source, SchemaNode):
+ if isinstance(self._source, BaseSchema):
return self._source.get_unparsed_data()
else:
return self._source
return True
@classmethod
- def json_schema(cls: Type["SchemaNode"], include_schema_definition: bool = True) -> Dict[Any, Any]:
+ def json_schema(cls: Type["BaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
if cls._LAYER is not None:
return cls._LAYER.json_schema(include_schema_definition=include_schema_definition)
SizeUnit,
TimeUnit,
)
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
def test_parsing_units():
- class TestSchema(SchemaNode):
+ class TestSchema(BaseSchema):
size: SizeUnit
time: TimeUnit
def test_checked_path():
- class TestSchema(SchemaNode):
+ class TestSchema(BaseSchema):
p: CheckedPath
assert str(TestSchema({"p": "/tmp"}).p) == "/tmp"
from pytest import raises
from typing_extensions import Literal
-from knot_resolver_manager.utils.modeling import SchemaNode, parse_json, parse_yaml
+from knot_resolver_manager.utils.modeling import BaseSchema, parse_json, parse_yaml
from knot_resolver_manager.utils.modeling.exceptions import DataDescriptionError, DataValidationError
-class _TestBool(SchemaNode):
+class _TestBool(BaseSchema):
v: bool
-class _TestInt(SchemaNode):
+class _TestInt(BaseSchema):
v: int
-class _TestStr(SchemaNode):
+class _TestStr(BaseSchema):
v: str
@pytest.mark.parametrize("typ,val", [(_TestInt, 5), (_TestBool, False), (_TestStr, "test")])
-def test_parsing_nested(typ: Type[SchemaNode], val: Any):
- class UpperSchema(SchemaNode):
+def test_parsing_nested(typ: Type[BaseSchema], val: Any):
+ class UpperSchema(BaseSchema):
l: typ
yaml = f"""
def test_parsing_simple_compound_types():
- class TestSchema(SchemaNode):
+ class TestSchema(BaseSchema):
l: List[int]
d: Dict[str, str]
t: Tuple[str, int]
def test_parsing_nested_compound_types():
- class TestSchema(SchemaNode):
+ class TestSchema(BaseSchema):
i: int
o: Optional[Dict[str, str]]
def test_partial_mutations():
- class InnerSchema(SchemaNode):
+ class InnerSchema(BaseSchema):
size: int = 5
- class ConfPreviousSchema(SchemaNode):
+ class ConfPreviousSchema(BaseSchema):
workers: Union[Literal["auto"], int] = 1
lua_config: Optional[str] = None
inner: InnerSchema = InnerSchema()
- class ConfSchema(SchemaNode):
+ class ConfSchema(BaseSchema):
_LAYER = ConfPreviousSchema
workers: int
def test_dash_conversion():
- class TestSchema(SchemaNode):
+ class TestSchema(BaseSchema):
awesome_field: Dict[str, str]
yaml = """
def test_eq():
- class B(SchemaNode):
+ class B(BaseSchema):
a: _TestInt
field: str
def test_docstring_parsing_valid():
- class NormalDescription(SchemaNode):
+ class NormalDescription(BaseSchema):
"""
Does nothing special
Really
desc = NormalDescription.json_schema()
assert desc["description"] == "Does nothing special\nReally"
- class FieldsDescription(SchemaNode):
+ class FieldsDescription(BaseSchema):
"""
This is an awesome test class
---
assert schema["properties"]["field"]["description"] == "This field does nothing interesting"
assert schema["properties"]["value"]["description"] == "Neither does this"
- class NoDescription(SchemaNode):
+ class NoDescription(BaseSchema):
nothing: str
_ = NoDescription.json_schema()
def test_docstring_parsing_invalid():
- class AdditionalItem(SchemaNode):
+ class AdditionalItem(BaseSchema):
"""
This class is wrong
---
with raises(DataDescriptionError):
_ = AdditionalItem.json_schema()
- class WrongDescription(SchemaNode):
+ class WrongDescription(BaseSchema):
"""
This class is wrong
---
import pytest
from typing_extensions import Literal
-from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling import BaseSchema
from knot_resolver_manager.utils.modeling.types import is_list, is_literal
types = [
Dict[Any, Any],
Tuple[Any, Any],
Union[str, int],
- SchemaNode,
+ BaseSchema,
]
literal_types = [Literal[5], Literal["test"], Literal[False]]