trust_anchor_signal_query: bool
time_skew_detection: bool
keep_removed: int
- refresh_time: Optional[int]
- hold_down_time: int
+ refresh_time: Optional[TimeUnit]
+ hold_down_time: TimeUnit
trust_anchors: Optional[List[str]]
negative_trust_anchors: Optional[List[str]]
from typing import Optional
+from knot_resolver_manager.exceptions import ValidationException
from knot_resolver_manager.utils import DataParser, DataValidator
-from knot_resolver_manager.utils.exceptions import DataValidationException
class Lua(DataParser):
def _validate(self) -> None:
if self.script and self.script_file:
- raise DataValidationException("'lua.script' and 'lua.script-file' are both defined, only one can be used")
+ raise ValidationException("'lua.script' and 'lua.script-file' are both defined, only one can be used")
{{ "modules.unload('detect_time_skew')" if not cfg.dnssec.time_skew_detection }}
trust_anchors.keep_removed = {{ cfg.dnssec.keep_removed }}
-{{ "trust_anchors.refresh_time = "+cfg.dnssec.refresh_time|string if cfg.dnssec.refresh_time }}
+{{ "trust_anchors.refresh_time = "+cfg.dnssec.refresh_time.seconds()|string if cfg.dnssec.refresh_time }}
-- dnssec.trust-anchors
{% if cfg.dnssec.trust_anchors %}
class PredictionStrict(DataValidator):
- window: int
+ window: TimeUnit
period: int
def _validate(self) -> None:
from typing_extensions import Literal
-from knot_resolver_manager.utils import DataParser, DataValidationException, DataValidator
+from knot_resolver_manager.exceptions import ValidationException
+from knot_resolver_manager.utils import DataParser, DataValidator
from knot_resolver_manager.utils.types import LiteralEnum
logger = logging.getLogger(__name__)
)
cpus = os.cpu_count()
if cpus is None:
- raise DataValidationException(
+ raise ValidationException(
"The number of available CPUs to automatically set the number of running"
"'kresd' workers could not be determined."
"The number can be specified manually in 'server:instances' configuration option."
return obj.hostname
elif obj.hostname is None:
return socket.gethostname()
- raise DataValidationException(f"Unexpected value for 'server.hostname': {obj.workers}")
+ raise ValueError(f"Unexpected value for 'server.hostname': {obj.workers}")
def _workers(self, obj: Server) -> int:
if isinstance(obj.workers, int):
return obj.workers
elif obj.workers == "auto":
return _cpu_count()
- raise DataValidationException(f"Unexpected value for 'server.workers': {obj.workers}")
+ raise ValueError(f"Unexpected value for 'server.workers': {obj.workers}")
def _validate(self) -> None:
if self.workers < 0:
- raise DataValidationException("Number of workers must be non-negative")
+ raise ValueError("Number of workers must be non-negative")
from pathlib import Path
from typing import Any, Dict, Optional, Pattern, Union
-from knot_resolver_manager.utils import CustomValueType, DataValidationException
+from knot_resolver_manager.exceptions import DataValidationException
+from knot_resolver_manager.utils import CustomValueType
from knot_resolver_manager.utils.data_parser_validator import DataParser, DataValidator
logger = logging.getLogger(__name__)
_re: Pattern[str]
_units: Dict[Optional[str], int]
- def __init__(self, source_value: Any) -> None:
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
self._value: int
self._value_orig: Union[str, int]
if grouped:
val, unit = grouped.groups()
if unit is None:
- raise DataValidationException(f"Missing units. Accepted units are {list(type(self)._units.keys())}")
+ raise DataValidationException(
+ f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path
+ )
elif unit not in type(self)._units:
raise DataValidationException(
f"Used unexpected unit '{unit}' for {type(self).__name__}."
- f" Accepted units are {list(type(self)._units.keys())}"
+ f" Accepted units are {list(type(self)._units.keys())}",
+ object_path,
)
self._value = int(val) * type(self)._units[unit]
else:
- raise DataValidationException(f"{type(self._value)} Failed to convert: {self}")
+ raise DataValidationException(f"{type(self._value)} Failed to convert: {self}", object_path)
elif isinstance(source_value, int):
raise DataValidationException(
"We do not accept number without units."
- f" Please convert the value to string an add a unit - {list(type(self)._units.keys())}"
+ f" Please convert the value to string an add a unit - {list(type(self)._units.keys())}",
+ object_path,
)
+ elif isinstance(source_value, type(self)):
+ self._value_orig = source_value._value_orig
+ self._value = source_value._value
else:
raise DataValidationException(
f"Unexpected input type for Unit type - {type(source_value)}."
- " Cause might be invalid format or invalid type."
+ " Cause might be invalid format or invalid type.",
+ object_path,
)
def __int__(self) -> int:
"""
return str(self._value_orig)
+ def __repr__(self) -> str:
+ return f"Unit[{type(self).__name__},{self._value_orig}]"
+
def __eq__(self, o: object) -> bool:
"""
Two instances are equal when they represent the same size
_re = re.compile(r"^(\d+)\s{0,1}([smhd]s?){0,1}$")
_units = {"ms": 1, "s": 1000, "m": 60 * 1000, "h": 3600 * 1000, "d": 24 * 3600 * 1000}
- def seconds(self):
+ def seconds(self) -> int:
return self._value // 1000
- def millis(self):
+ def millis(self) -> int:
return self._value
class AnyPath(CustomValueType):
- def __init__(self, source_value: Any) -> None:
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
if isinstance(source_value, AnyPath):
self._value = source_value._value
self._value: Path = Path(source_value)
else:
raise DataValidationException(
- f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'"
+ f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'", object_path
)
try:
self._value = self._value.resolve(strict=False)
except RuntimeError as e:
- raise DataValidationException("Failed to resolve given file path. Is there a symlink loop?") from e
+ raise DataValidationException(
+ "Failed to resolve given file path. Is there a symlink loop?", object_path
+ ) from e
def __str__(self) -> str:
return str(self._value)
elif present == {"interface", ...}:
return ListenType.INTERFACE
else:
- raise DataValidationException(
+ raise ValueError(
"Listen configuration contains multiple incompatible options at once. "
"You can use (IP and PORT) or (UNIX_SOCKET) or (INTERFACE)."
)
if origin.port is None:
return None
if not 0 <= origin.port <= 65_535:
- raise DataValidationException(f"Port value {origin.port} out of range of usual 2-byte port value")
+ raise ValueError(f"Port value {origin.port} out of range of usual 2-byte port value")
return origin.port
def _ip(self, origin: Listen):
if origin.ip is None:
return None
- try:
- return ipaddress.ip_address(origin.ip)
- except ValueError as e:
- raise DataValidationException(f"Failed to parse IP address from '{origin.ip}'") from e
+ # throws value error, so that get's caught outside of this function
+ return ipaddress.ip_address(origin.ip)
def _validate(self) -> None:
# we already check that it's there is only one option in the `_typ` method
class IPNetwork(CustomValueType):
- def __init__(self, source_value: Any) -> None:
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
if isinstance(source_value, str):
try:
self._value: Union[ipaddress.IPv4Network, ipaddress.IPv6Network] = ipaddress.ip_network(source_value)
except ValueError as e:
- raise DataValidationException("Failed to parse IP network.") from e
+ raise DataValidationException("Failed to parse IP network.", object_path) from e
else:
raise DataValidationException(
f"Unexpected value for a network subnet. Expected string, got '{source_value}'"
- " with type '{type(source_value)}'"
+ " with type '{type(source_value)}'",
+ object_path,
)
def to_std(self) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network]:
class IPv6Network96(CustomValueType):
- def __init__(self, source_value: Any) -> None:
- super().__init__(source_value)
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
+ super().__init__(source_value, object_path=object_path)
if isinstance(source_value, str):
try:
self._value: ipaddress.IPv6Network = ipaddress.IPv6Network(source_value)
except ValueError as e:
- raise DataValidationException("Failed to parse IPv6 /96 network.") from e
+ raise DataValidationException("Failed to parse IPv6 /96 network.", object_path) from e
if self._value.prefixlen != 96:
raise DataValidationException(
"Expected IPv6 network address with /96 prefix lenght."
- f" Got prefix lenght of {self._value.prefixlen}"
+ f" Got prefix lenght of {self._value.prefixlen}",
+ object_path,
)
else:
raise DataValidationException(
"Unexpected value for a network subnet."
- f" Expected string, got '{source_value}' with type '{type(source_value)}'"
+ f" Expected string, got '{source_value}' with type '{type(source_value)}'",
+ object_path,
)
def __str__(self) -> str:
-class SubprocessControllerException(Exception):
+class KresdManagerException(Exception):
+ """
+ Base class for all custom exceptions we use in our code
+ """
+
+
+class SubprocessControllerException(KresdManagerException):
+ pass
+
+
+class TreeException(KresdManagerException):
+ def __init__(self, msg: str, tree_path: str) -> None:
+ super().__init__(msg)
+ self._tree_path = tree_path
+
+ def where(self) -> str:
+ return self._tree_path
+
+
+class DataParsingException(TreeException):
+ pass
+
+
+class DataValidationException(TreeException):
+ pass
+
+
+class ParsingException(KresdManagerException):
+ pass
+
+
+class ValidationException(KresdManagerException):
pass
from knot_resolver_manager import kres_id
from knot_resolver_manager.compat.asyncio import create_task
from knot_resolver_manager.constants import KRESD_CONFIG_FILE, WATCHDOG_INTERVAL
+from knot_resolver_manager.exceptions import KresdManagerException
from knot_resolver_manager.kresd_controller.interface import (
Subprocess,
SubprocessController,
SubprocessStatus,
SubprocessType,
)
-from knot_resolver_manager.utils import DataValidationException
from knot_resolver_manager.utils.async_utils import writefile
from .datamodel import KresConfig, KresConfigStrict
last = self.get_last_used_config_strict()
if last is not None:
await self._write_config(last)
- raise DataValidationException("Canary kresd instance failed. Config is invalid.")
+ raise KresdManagerException("Canary kresd instance failed. Config is invalid.")
logger.debug("Canary process test passed, Applying new config to all workers")
self._last_used_config = config
from aiohttp.web_response import json_response
from knot_resolver_manager.constants import MANAGER_CONFIG_FILE
+from knot_resolver_manager.exceptions import KresdManagerException, ParsingException, TreeException, ValidationException
from knot_resolver_manager.kresd_controller import get_controller_by_name
from knot_resolver_manager.kresd_controller.interface import SubprocessController
-from knot_resolver_manager.utils import DataValidationException, Format
+from knot_resolver_manager.utils import Format
from knot_resolver_manager.utils.async_utils import readfile
from .datamodel import KresConfig
try:
return await handler(request)
- except DataValidationException as e:
- logger.error("Failed to parse given data in API request", exc_info=True)
- return web.Response(text=f"Data validation failed: {e}", status=HTTPStatus.BAD_REQUEST)
+ except KresdManagerException as e:
+ if isinstance(e, TreeException):
+ return web.Response(
+ text=f"Configuration validation failed @ '{e.where()}': {e}", status=HTTPStatus.BAD_REQUEST
+ )
+ elif isinstance(e, (ParsingException, ValidationException)):
+ return web.Response(text=f"Configuration validation failed: {e}", status=HTTPStatus.BAD_REQUEST)
+ else:
+ logger.error("Request processing failed", exc_info=True)
+ return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
def setup_routes(app: web.Application):
from .custom_types import CustomValueType
from .data_parser_validator import DataParser, DataValidator, Format
-from .exceptions import DataParsingException, DataValidationException
-from .overload import Overloaded
T = TypeVar("T")
__all__ = [
- "ignore_exceptions_optional",
- "ignore_exceptions",
"Format",
"CustomValueType",
"DataParser",
"DataValidator",
- "DataParsingException",
- "DataValidationException",
- "Overloaded",
]
raise a `DataValidationException` in case of errors.
"""
- def __init__(self, source_value: Any) -> None:
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
pass
def __int__(self) -> int:
from yaml.constructor import ConstructorError
from yaml.nodes import MappingNode
+from knot_resolver_manager.exceptions import (
+ DataParsingException,
+ DataValidationException,
+ ParsingException,
+ ValidationException,
+)
from knot_resolver_manager.utils.custom_types import CustomValueType
-from knot_resolver_manager.utils.exceptions import DataParsingException
from knot_resolver_manager.utils.types import (
get_attr_type,
get_generic_type_argument,
return obj
-def _validated_object_type(cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False) -> Any:
+def _validated_object_type(
+ cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False, object_path: str = "/"
+) -> Any:
"""
Given an expected type `cls` and a value object `obj`, validate the type of `obj` and return it
"""
if obj is None:
return None
else:
- raise DataParsingException(f"Expected None, found '{obj}'.")
+ raise DataParsingException(f"Expected None, found '{obj}'.", object_path)
# Union[*variants] (handles Optional[T] due to the way the typing system works)
elif is_union(cls):
variants = get_generic_type_arguments(cls)
for v in variants:
try:
- return _validated_object_type(v, obj)
+ return _validated_object_type(v, obj, object_path=object_path)
except DataParsingException:
pass
- raise DataParsingException(f"Union {cls} could not be parsed - parsing of all variants failed.")
+ raise DataParsingException(f"Union {cls} could not be parsed - parsing of all variants failed.", object_path)
# after this, there is no place for a None object
elif obj is None:
- raise DataParsingException(f"Unexpected None value for type {cls}")
+ raise DataParsingException(f"Unexpected None value for type {cls}", object_path)
# int
elif cls == int:
# except for CustomValueType class instances
if is_obj_type(obj, int) or isinstance(obj, CustomValueType):
return int(obj)
- raise DataParsingException(f"Expected int, found {type(obj)}")
+ raise DataParsingException(f"Expected int, found {type(obj)}", object_path)
# str
elif cls == str:
raise DataParsingException(
"Expected str, found bool. Be careful, that YAML parsers consider even"
' "no" and "yes" as a bool. Search for the Norway Problem for more'
- " details. And please use quotes explicitly."
+ " details. And please use quotes explicitly.",
+ object_path,
)
else:
raise DataParsingException(
- f"Expected str (or number that would be cast to string), but found type {type(obj)}"
+ f"Expected str (or number that would be cast to string), but found type {type(obj)}", object_path
)
# bool
if is_obj_type(obj, bool):
return obj
else:
- raise DataParsingException(f"Expected bool, found {type(obj)}")
+ raise DataParsingException(f"Expected bool, found {type(obj)}", object_path)
# float
elif cls == float:
if obj == expected:
return obj
else:
- raise DataParsingException(f"Literal {cls} is not matched with the value {obj}")
+ raise DataParsingException(f"Literal {cls} is not matched with the value {obj}", object_path)
# Dict[K,V]
elif is_dict(cls):
key_type, val_type = get_generic_type_arguments(cls)
try:
return {
- _validated_object_type(key_type, key): _validated_object_type(val_type, val) for key, val in obj.items()
+ _validated_object_type(key_type, key, object_path=f"{object_path} @ key {key}"): _validated_object_type(
+ val_type, val, object_path=f"{object_path} @ value for key {key}"
+ )
+ for key, val in obj.items()
}
except AttributeError as e:
raise DataParsingException(
- f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", e
- )
+ f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
+ ) from e
# any Enums (probably used only internally in DataValidator)
elif is_enum(cls):
if isinstance(obj, cls):
return obj
else:
- raise DataParsingException("Unexpected value '{obj}' for enum '{cls}'")
+ raise DataParsingException(f"Unexpected value '{obj}' for enum '{cls}'", object_path)
# List[T]
elif is_list(cls):
inner_type = get_generic_type_argument(cls)
- return [_validated_object_type(inner_type, val) for val in obj]
+ return [_validated_object_type(inner_type, val, object_path=f"{object_path}[]") for val in obj]
# Tuple[A,B,C,D,...]
elif is_tuple(cls):
types = get_generic_type_arguments(cls)
- return tuple(_validated_object_type(typ, val) for typ, val in zip(types, obj))
+ return tuple(_validated_object_type(typ, val, object_path=object_path) for typ, val in zip(types, obj))
# CustomValueType subclasses
elif inspect.isclass(cls) and issubclass(cls, CustomValueType):
# no validation performed, the implementation does it in the constuctor
- return cls(obj)
+ return cls(obj, object_path=object_path)
# nested DataParser subclasses
elif inspect.isclass(cls) and issubclass(cls, DataParser):
# we should return DataParser, we expect to be given a dict,
# because we can construct a DataParser from it
if isinstance(obj, dict):
- return cls(obj) # type: ignore
- raise DataParsingException(f"Expected '{dict}' object, found '{type(obj)}'")
+ return cls(obj, object_path=object_path) # type: ignore
+ raise DataParsingException(f"Expected '{dict}' object, found '{type(obj)}'", object_path)
# nested DataValidator subclasses
elif inspect.isclass(cls) and issubclass(cls, DataValidator):
# we should return DataValidator, we expect to be given a DataParser,
# because we can construct a DataValidator from it
if isinstance(obj, DataParser):
- return cls(obj)
- raise DataParsingException(f"Expected instance of '{DataParser}' class, found '{type(obj)}'")
+ return cls(obj, object_path=object_path)
+ raise DataParsingException(f"Expected instance of '{DataParser}' class, found '{type(obj)}'", object_path)
# if the object matches, just pass it through
elif inspect.isclass(cls) and isinstance(obj, cls):
else:
raise DataParsingException(
f"Type {cls} cannot be parsed. This is a implementation error. "
- "Please fix your types in the class or improve the parser/validator."
+ "Please fix your types in the class or improve the parser/validator.",
+ object_path,
)
dict_out: Dict[Any, Any] = {}
for key, val in pairs:
if key in dict_out:
- raise DataParsingException(f"Duplicate attribute key detected: {key}")
+ raise ParsingException(f"Duplicate attribute key detected: {key}")
dict_out[key] = val
return dict_out
# check for duplicate keys
if key in mapping:
- raise DataParsingException(f"duplicate key detected: {key_node.start_mark}")
+ raise ParsingException(f"duplicate key detected: {key_node.start_mark}")
value = self.construct_object(value_node, deep=deep) # type: ignore
mapping[key] = value
return mapping
"text/vnd.yaml": Format.YAML,
}
if mime_type not in formats:
- raise DataParsingException("Unsupported MIME type")
+ raise ParsingException("Unsupported MIME type")
return formats[mime_type]
class DataParser:
- def __init__(self, obj: Optional[Dict[Any, Any]] = None):
+ def __init__(self, obj: Optional[Dict[Any, Any]] = None, object_path: str = "/"):
cls = self.__class__
annot = cls.__dict__.get("__annotations__", {})
use_default = hasattr(cls, name)
default = getattr(cls, name, ...)
- value = _validated_object_type(python_type, val, default, use_default)
+ value = _validated_object_type(python_type, val, default, use_default, object_path=f"{object_path}/{name}")
setattr(self, name, value)
# check for unused keys
additional_info = (
" The problem might be that you are using '_', but you should be using '-' instead."
)
- raise DataParsingException(f"Attribute '{key}' was not provided with any value." + additional_info)
+ raise DataParsingException(
+ f"Attribute '{key}' was not provided with any value." + additional_info, object_path
+ )
@classmethod
def parse_from(cls: Type[_T], fmt: Format, text: str):
# prepare and validate the path object
path = path[:-1] if path.endswith("/") else path
if re.match(_SUBTREE_MUTATION_PATH_PATTERN, path) is None:
- raise DataParsingException("Provided object path for mutation is invalid.")
+ raise ParsingException("Provided object path for mutation is invalid.")
path = path[1:] if path.startswith("/") else path
# now, the path variable should contain '/' separated field names
segment = dash_segment.replace("-", "_")
if segment == "":
- raise DataParsingException(f"Unexpectedly empty segment in path '{path}'")
+ raise ParsingException(f"Unexpectedly empty segment in path '{path}'")
elif is_internal_field(segment):
- raise DataParsingException("No, changing internal fields (starting with _) is not allowed. Nice try.")
+ raise ParsingException(
+ "No, changing internal fields (starting with _) is not allowed. Nice try though."
+ )
elif hasattr(obj, segment):
parent = obj
obj = getattr(parent, segment)
else:
- raise DataParsingException(
+ raise ParsingException(
f"Path segment '{dash_segment}' does not match any field on the provided parent object"
)
assert parent is not None
class DataValidator:
- def __init__(self, obj: DataParser):
+ def __init__(self, obj: DataParser, object_path: str = ""):
cls = self.__class__
anot = cls.__dict__.get("__annotations__", {})
# use transformation function if available
if hasattr(self, f"_{attr_name}"):
- value = getattr(self, f"_{attr_name}")(obj)
+ try:
+ value = getattr(self, f"_{attr_name}")(obj)
+ except (ValueError, ValidationException) as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = "Failed to validate value type"
+ raise DataValidationException(msg, object_path) from e
elif hasattr(obj, attr_name):
value = getattr(obj, attr_name)
else:
- raise DataParsingException(f"DataParser object {obj} is missing '{attr_name}' attribute.")
+ raise DataValidationException(
+ f"DataParser object {obj} is missing '{attr_name}' attribute.", object_path
+ )
setattr(self, attr_name, _validated_object_type(attr_type, value))
+++ /dev/null
-class DataParsingException(Exception):
- pass
-
-
-class DataValidationException(Exception):
- pass
"no-else-raise", # not helpful for readability, when we want explicit branches
"raising-bad-type", # handled by type checker
"too-many-arguments", # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid...
+ "no-member", # checked by pyright
]
[tool.pylint.SIMILARITIES]
from knot_resolver_manager.datamodel import KresConfig, KresConfigStrict
+from knot_resolver_manager.datamodel.types import TimeUnit
def test_dns64_true_default():
assert strict.dnssec.time_skew_detection == True
assert strict.dnssec.keep_removed == 0
assert strict.dnssec.refresh_time == None
- assert strict.dnssec.hold_down_time == 30 * 24 * 60 ** 2
+ assert strict.dnssec.hold_down_time == TimeUnit("30d")
assert strict.dnssec.trust_anchors == None
assert strict.dnssec.negative_trust_anchors == None
SizeUnit,
TimeUnit,
)
-from knot_resolver_manager.utils import DataParser, DataValidationException, DataValidator
+from knot_resolver_manager.exceptions import KresdManagerException
+from knot_resolver_manager.utils import DataParser, DataValidator
def test_size_unit():
assert SizeUnit("5368709120B") == SizeUnit("5242880K") == SizeUnit("5120M") == SizeUnit("5G")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
SizeUnit("-5368709120B")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
SizeUnit(-5368709120)
- with raises(DataValidationException):
+ with raises(KresdManagerException):
SizeUnit("5120MM")
def test_time_unit():
assert TimeUnit("1d") == TimeUnit("24h") == TimeUnit("1440m") == TimeUnit("86400s")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
TimeUnit("-1")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
TimeUnit(-24)
- with raises(DataValidationException):
+ with raises(KresdManagerException):
TimeUnit("1440mm")
assert TimeUnit("10ms").millis() == 10
ip: 127.0.0.1
"""
)
- with raises(DataValidationException):
+ with raises(KresdManagerException):
ListenStrict(o)
assert o.to_std().prefixlen == 24
assert o.to_std() == ipaddress.IPv4Network("10.11.12.0/24")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
# because only the prefix can have non-zero bits
IPNetwork("10.11.12.13/8")
def test_ipv6_96_network():
_ = IPv6Network96("fe80::/96")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
IPv6Network96("fe80::/95")
- with raises(DataValidationException):
+ with raises(KresdManagerException):
IPv6Network96("10.11.12.3/96")
assert strict.trust_anchor_signal_query == False
assert strict.time_skew_detection == False
assert strict.keep_removed == 3
- assert strict.refresh_time == 10
- assert strict.hold_down_time == 45 * 24 * 60 ** 2
+ assert strict.refresh_time == TimeUnit("10s")
+ assert strict.hold_down_time == TimeUnit("45d")
assert strict.trust_anchors == [
". 3600 IN DS 19036 8 2 49AAC11D7B6F6446702E54A1607371607A1A41855200FD2CE1CDDE32F24E8FB5"
from pytest import raises
from knot_resolver_manager.datamodel.lua_config import Lua, LuaStrict
-from knot_resolver_manager.utils.exceptions import DataValidationException
+from knot_resolver_manager.exceptions import KresdManagerException
yaml = """
script-only: true
script-file: path/to/file
"""
- with raises(DataValidationException):
+ with raises(KresdManagerException):
LuaStrict(Lua.from_yaml(yaml2))
assert strict.violators_workarounds == True
assert strict.serve_stale == True
- assert strict.prediction.window == 10 * 60
+ assert strict.prediction.window == TimeUnit("10m")
assert strict.prediction.period == 20
y = OptionsStrict(x)
assert x.prediction == True
- assert y.prediction.window == 900
+ assert y.prediction.window == TimeUnit("15m")
assert y.prediction.period == 24
from pytest import raises
from typing_extensions import Literal
-from knot_resolver_manager.utils import DataParser, DataValidationException, DataValidator, Format
-from knot_resolver_manager.utils.exceptions import DataParsingException
+from knot_resolver_manager.exceptions import DataParsingException
+from knot_resolver_manager.utils import DataParser, DataValidator, Format
def test_primitive():
def _validate(self) -> None:
if self.workers < 0:
- raise DataValidationException("Number of workers must be non-negative")
+ raise ValueError("Number of workers must be non-negative")
yaml = """
workers: auto
from typing import Optional
-from knot_resolver_manager.utils import Overloaded
+from knot_resolver_manager.utils.overload import Overloaded
def test_simple():
--- /dev/null
+"""
+This type stub file was generated by pyright.
+"""
+
+from _pytest import __version__
+from _pytest.assertion import register_assert_rewrite
+from _pytest.compat import _setup_collect_fakemodule
+from _pytest.config import ExitCode, UsageError, cmdline, hookimpl, hookspec, main
+from _pytest.debugging import pytestPDB as __pytestPDB
+from _pytest.fixtures import fillfixtures as _fillfuncargs
+from _pytest.fixtures import fixture, yield_fixture
+from _pytest.freeze_support import freeze_includes
+from _pytest.main import Session
+from _pytest.mark import MARK_GEN as mark
+from _pytest.mark import param
+from _pytest.nodes import Collector, File, Item
+from _pytest.outcomes import exit, fail, importorskip, skip, xfail
+from _pytest.python import Class, Function, Instance, Module, Package
+from _pytest.python_api import approx, raises
+from _pytest.recwarn import deprecated_call, warns
+from _pytest.warning_types import (
+ PytestAssertRewriteWarning,
+ PytestCacheWarning,
+ PytestCollectionWarning,
+ PytestConfigWarning,
+ PytestDeprecationWarning,
+ PytestExperimentalApiWarning,
+ PytestUnhandledCoroutineWarning,
+ PytestUnknownMarkWarning,
+ PytestWarning,
+)
+
+"""
+pytest: unit and functional testing with Python.
+"""
+set_trace = ...
--- /dev/null
+"""
+This type stub file was generated by pyright.
+"""
+
+"""
+pytest entry point
+"""
+if __name__ == "__main__":
+ ...