from knot_resolver_manager import compat
from knot_resolver_manager.server import start_server
-from knot_resolver_manager.utils.parsing import ParsedTree
+from knot_resolver_manager.utils.modeling import ParsedTree
class KnotManagerClient:
from knot_resolver_manager.client import KnotManagerClient
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import KresManagerException
-from knot_resolver_manager.utils.parsing import parse_yaml
+from knot_resolver_manager.utils.modeling import parse_yaml
BASE_URL = "base_url"
from typing import Any, Awaitable, Callable, List, Tuple
from knot_resolver_manager.datamodel import KresConfig
-from knot_resolver_manager.exceptions import DataException, KresManagerException
+from knot_resolver_manager.exceptions import KresManagerException
from knot_resolver_manager.utils.functional import Result
+from knot_resolver_manager.utils.modeling.exceptions import DataParsingError
VerifyCallback = Callable[[KresConfig, KresConfig], Awaitable[Result[None, str]]]
UpdateCallback = Callable[[KresConfig], Awaitable[None]]
self._verifiers.append(verifier)
res = await verifier(self.get(), self.get())
if res.is_err():
- raise DataException(f"Initial config verification failed with error: {res.unwrap_err()}")
+ raise DataParsingError(f"Initial config verification failed with error: {res.unwrap_err()}")
async def register_on_change_callback(self, callback: UpdateCallback) -> None:
"""
from typing import List, Optional
from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, SizeUnit, TimeUnit
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class PrefillSchema(SchemaNode):
from knot_resolver_manager.datamodel.types.types import IntPositive, UncheckedPath
from knot_resolver_manager.datamodel.view_schema import ViewSchema
from knot_resolver_manager.datamodel.webmgmt_schema import WebmgmtSchema
-from knot_resolver_manager.exceptions import DataException
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
logger = logging.getLogger(__name__)
_MAIN_TEMPLATE = _import_lua_template()
-def _cpu_count() -> int:
+def _cpu_count() -> Optional[int]:
try:
return len(os.sched_getaffinity(0))
except (NotImplementedError, AttributeError):
- logger.warning(
- "The number of usable CPUs could not be determined using 'os.sched_getaffinity()'."
- "Attempting to get the number of system CPUs using 'os.cpu_count()'"
- )
+ logger.warning("The number of usable CPUs could not be determined using 'os.sched_getaffinity()'.")
cpus = os.cpu_count()
if cpus is None:
- raise DataException(
- "The number of available CPUs to automatically set the number of running"
- "'kresd' workers could not be determined."
- "The number can be specified manually in 'server:instances' configuration option."
- )
+ logger.warning("The number of usable CPUs could not be determined using 'os.cpu_count()'.")
return cpus
def _workers(self, obj: Raw) -> Any:
if obj.workers == "auto":
- return IntPositive(_cpu_count())
+ count = _cpu_count()
+ if count:
+ return IntPositive(count)
+ raise ValueError(
+ "The number of available CPUs to automatically set the number of running 'kresd' workers could not be determined."
+ "The number of workers can be configured manually in 'workers' option."
+ )
return obj.workers
def _dnssec(self, obj: Raw) -> Any:
return obj.dns64
def _validate(self) -> None:
- try:
- cpu_count = _cpu_count()
- if int(self.workers) > 10 * cpu_count:
- raise ValueError("refusing to run with more then instances 10 instances per cpu core")
- except DataException:
- # sometimes, we won't be able to get information about the cpu count
- pass
+ cpu_count = _cpu_count()
+
+ if cpu_count and int(self.workers) > 10 * cpu_count:
+ raise ValueError("refusing to run with more then 10 workers per cpu core")
+ elif int(self.workers) > MAX_WORKERS:
+ raise ValueError(f"refusing to run with more workers then allowed maximum {MAX_WORKERS}")
def render_lua(self) -> str:
# FIXME the `cwd` argument is used only for configuring control socket path
from knot_resolver_manager.datamodel.types import IPv6Network96
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class Dns64Schema(SchemaNode):
from typing import List, Optional
from knot_resolver_manager.datamodel.types import IntNonNegative, TimeUnit
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class TrustAnchorFileSchema(SchemaNode):
from knot_resolver_manager.datamodel.policy_schema import ForwardServerSchema
from knot_resolver_manager.datamodel.types import DomainName, IPAddressOptionalPort, PolicyFlagEnum
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class ForwardZoneSchema(SchemaNode):
from typing_extensions import Literal, TypeAlias
from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
LogLevelEnum = Literal["crit", "err", "warning", "notice", "info", "debug"]
LogTargetEnum = Literal["syslog", "stderr", "stdout"]
from typing import Optional
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class LuaSchema(SchemaNode):
from typing import Optional
from knot_resolver_manager.datamodel.types import CheckedPath, IPAddressPort
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class ManagementSchema(SchemaNode):
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import DomainName, IPAddress, PortNumber, TimeUnit
-from knot_resolver_manager.utils.modelling import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class GraphiteSchema(SchemaNode):
PortNumber,
SizeUnit,
)
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
KindEnum = Literal["dns", "xdp", "dot", "doh-legacy", "doh2"]
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import IntNonNegative, TimeUnit
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
GlueCheckingEnum = Literal["normal", "strict", "permissive"]
PolicyFlagEnum,
TimeUnit,
)
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class FilterSchema(SchemaNode):
from typing import List, Optional
from knot_resolver_manager.datamodel.types import CheckedPath, PolicyActionEnum, PolicyFlagEnum
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class RPZSchema(SchemaNode):
from typing_extensions import Literal
from knot_resolver_manager.datamodel.policy_schema import ActionSchema
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class SliceSchema(SchemaNode):
from typing import Dict, List, Optional
from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddress, TimeUnit
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class StaticHintsSchema(SchemaNode):
from typing import List, Optional, Union
from knot_resolver_manager.datamodel.types import DomainName, IPAddressOptionalPort, PolicyFlagEnum
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class StubServerSchema(SchemaNode):
import re
from typing import Any, Dict, Pattern, Type
-from knot_resolver_manager.exceptions import SchemaException
-from knot_resolver_manager.utils import CustomValueType
+from knot_resolver_manager.utils.modeling import CustomValueType
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
class IntBase(CustomValueType):
super().__init__(source_value)
if isinstance(source_value, int) and not isinstance(source_value, bool):
if hasattr(self, "_min") and (source_value < self._min):
- raise SchemaException(f"value {source_value} is lower than the minimum {self._min}.", object_path)
+ raise DataValidationError(f"value {source_value} is lower than the minimum {self._min}.", object_path)
if hasattr(self, "_max") and (source_value > self._max):
- raise SchemaException(f"value {source_value} is higher than the maximum {self._max}", object_path)
+ raise DataValidationError(f"value {source_value} is higher than the maximum {self._max}", object_path)
self._value = source_value
else:
- raise SchemaException(
+ raise DataValidationError(
f"expected integer, got '{type(source_value)}'",
object_path,
)
if type(self)._re.match(source_value):
self._value: str = source_value
else:
- raise SchemaException(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path)
+ raise DataValidationError(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path)
else:
- raise SchemaException(
+ raise DataValidationError(
f"expected string, got '{type(source_value)}'",
object_path,
)
if grouped:
val, unit = grouped.groups()
if unit is None:
- raise SchemaException(
+ raise DataValidationError(
f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path
)
elif unit not in type(self)._units:
- raise SchemaException(
+ raise DataValidationError(
f"Used unexpected unit '{unit}' for {type(self).__name__}."
f" Accepted units are {list(type(self)._units.keys())}",
object_path,
)
self._value = int(val) * type(self)._units[unit]
else:
- raise SchemaException(f"{type(self._value)} Failed to convert: {self}", object_path)
+ raise DataValidationError(f"{type(self._value)} Failed to convert: {self}", object_path)
elif isinstance(source_value, int):
- raise SchemaException(
+ raise DataValidationError(
f"number without units, please convert to string and add unit - {list(type(self)._units.keys())}",
object_path,
)
else:
- raise SchemaException(
+ raise DataValidationError(
f"expected number with units in a string, got '{type(source_value)}'.",
object_path,
)
from typing import Any, Dict, Optional, Type, Union
from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase
-from knot_resolver_manager.exceptions import SchemaException
-from knot_resolver_manager.utils import CustomValueType
+from knot_resolver_manager.utils.modeling import CustomValueType
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
class IntNonNegative(IntRangeBase):
try:
return cls(int(port), object_path)
except ValueError as e:
- raise SchemaException(f"invalid port number {port}", object_path) from e
+ raise DataValidationError(f"invalid port number {port}", object_path) from e
class SizeUnit(UnitBase):
try:
punycode = source_value.encode("idna").decode("utf-8") if source_value != "." else "."
except ValueError:
- raise SchemaException(
+ raise DataValidationError(
f"conversion of '{source_value}' to IDN punycode representation failed",
object_path,
)
self._value = source_value
self._punycode = punycode
else:
- raise SchemaException(
+ raise DataValidationError(
f"'{source_value}' represented in punycode '{punycode}' does not match '{self._re.pattern}' pattern",
object_path,
)
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for '<domain-name>'."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
except ValueError as e1:
try:
self.if_name = InterfaceName(parts[0])
- except SchemaException as e2:
- raise SchemaException(
+ except DataValidationError as e2:
+ raise DataValidationError(
f"expected IP address or interface name, got '{parts[0]}'.", object_path
) from e1 and e2
self.port = PortNumber.from_str(parts[1], object_path)
else:
- raise SchemaException(
+ raise DataValidationError(
f"expected '<ip-address|interface-name>@<port>', got '{source_value}'.", object_path
)
self._value = source_value
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for '<ip-address|interface-name>@<port>'."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
except ValueError as e1:
try:
self.if_name = InterfaceName(parts[0])
- except SchemaException as e2:
- raise SchemaException(
+ except DataValidationError as e2:
+ raise DataValidationError(
f"expected IP address or interface name, got '{parts[0]}'.", object_path
) from e1 and e2
if len(parts) == 2:
self.port = PortNumber.from_str(parts[1], object_path)
else:
- raise SchemaException(f"expected '<ip-address|interface-name>[@<port>]', got '{parts}'.", object_path)
+ raise DataValidationError(
+ f"expected '<ip-address|interface-name>[@<port>]', got '{parts}'.", object_path
+ )
self._value = source_value
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for '<ip-address|interface-name>[@<port>]'."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
try:
self.addr = ipaddress.ip_address(parts[0])
except ValueError as e:
- raise SchemaException(f"failed to parse IP address '{parts[0]}'.", object_path) from e
+ raise DataValidationError(f"failed to parse IP address '{parts[0]}'.", object_path) from e
else:
- raise SchemaException(f"expected '<ip-address>@<port>', got '{source_value}'.", object_path)
+ raise DataValidationError(f"expected '<ip-address>@<port>', got '{source_value}'.", object_path)
self._value = source_value
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for '<ip-address>@<port>'."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
try:
self.addr = ipaddress.ip_address(parts[0])
except ValueError as e:
- raise SchemaException(f"failed to parse IP address '{parts[0]}'.", object_path) from e
+ raise DataValidationError(f"failed to parse IP address '{parts[0]}'.", object_path) from e
if len(parts) == 2:
self.port = PortNumber.from_str(parts[1], object_path)
else:
- raise SchemaException(f"expected '<ip-address>[@<port>]', got '{parts}'.", object_path)
+ raise DataValidationError(f"expected '<ip-address>[@<port>]', got '{parts}'.", object_path)
self._value = source_value
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for a '<ip-address>[@<port>]'."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
try:
self._value: ipaddress.IPv4Address = ipaddress.IPv4Address(source_value)
except ValueError as e:
- raise SchemaException("failed to parse IPv4 address.", object_path) from e
+ raise DataValidationError("failed to parse IPv4 address.", object_path) from e
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for a IPv4 address."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
try:
self._value: ipaddress.IPv6Address = ipaddress.IPv6Address(source_value)
except ValueError as e:
- raise SchemaException("failed to parse IPv6 address.", object_path) from e
+ raise DataValidationError("failed to parse IPv6 address.", object_path) from e
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for a IPv6 address."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
try:
self._value: Union[ipaddress.IPv4Network, ipaddress.IPv6Network] = ipaddress.ip_network(source_value)
except ValueError as e:
- raise SchemaException("failed to parse IP network.", object_path) from e
+ raise DataValidationError("failed to parse IP network.", object_path) from e
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for a network subnet."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
try:
self._value: ipaddress.IPv6Network = ipaddress.IPv6Network(source_value)
except ValueError as e:
- raise SchemaException("failed to parse IPv6 /96 network.", object_path) from e
+ raise DataValidationError("failed to parse IPv6 /96 network.", object_path) from e
if self._value.prefixlen == 128:
- raise SchemaException(
+ raise DataValidationError(
"Expected IPv6 network address with /96 prefix length."
" Submitted address has been interpreted as /128."
" Maybe, you forgot to add /96 after the base address?",
)
if self._value.prefixlen != 96:
- raise SchemaException(
+ raise DataValidationError(
"expected IPv6 network address with /96 prefix length."
f" Got prefix lenght of {self._value.prefixlen}",
object_path,
)
else:
- raise SchemaException(
+ raise DataValidationError(
"Unexpected value for a network subnet."
f" Expected string, got '{source_value}' with type '{type(source_value)}'",
object_path,
if isinstance(source_value, str):
self._value: Path = Path(source_value)
else:
- raise SchemaException(
+ raise DataValidationError(
f"expected file path in a string, got '{source_value}' with type '{type(source_value)}'.", object_path
)
try:
self._value = self._value.resolve(strict=False)
except RuntimeError as e:
- raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
+ raise DataValidationError("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
from typing import List, Optional
from knot_resolver_manager.datamodel.types import IPNetwork, PolicyFlagEnum
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class ViewSchema(SchemaNode):
from typing import Optional
from knot_resolver_manager.datamodel.types import CheckedPath, InterfacePort
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
class WebmgmtSchema(SchemaNode):
-from typing import Iterable, List
+from typing import List
class CancelStartupExecInsteadException(Exception):
class SubprocessControllerTimeoutException(KresManagerException):
pass
-
-
-class SchemaException(KresManagerException):
- def __init__(self, msg: str, tree_path: str, child_exceptions: "Iterable[SchemaException]" = tuple()) -> None:
- super().__init__(msg)
- self._tree_path = tree_path
- self._child_exceptions = child_exceptions
-
- def where(self) -> str:
- return self._tree_path
-
- def msg(self):
- return f"[{self.where()}] " + super().__str__()
-
- def recursive_msg(self, indentation_level: int = 0) -> str:
- INDENT = indentation_level * "\t"
- msg_parts: List[str] = [f"{INDENT}{self.msg()}"]
- for c in self._child_exceptions:
- msg_parts.append(c.recursive_msg(indentation_level + 1))
- return "\n".join(msg_parts)
-
- def __str__(self) -> str:
- return self.recursive_msg()
-
-
-class AggregateSchemaException(SchemaException):
- def __init__(self, object_path: str, child_exceptions: "Iterable[SchemaException]") -> None:
- super().__init__("error due to lower level exceptions", object_path, child_exceptions)
-
- def recursive_msg(self, indentation_level: int = 0) -> str:
- inc = 0
- msg_parts: List[str] = []
- if indentation_level == 0:
- inc = 1
- msg_parts.append("multiple configuration errors detected:")
-
- for c in self._child_exceptions:
- msg_parts.append(c.recursive_msg(indentation_level + inc))
- return "\n".join(msg_parts)
-
-
-class DataException(KresManagerException):
- pass
-
-
-class ParsingException(KresManagerException):
- pass
SubprocessType,
)
from knot_resolver_manager.utils.functional import Result
-from knot_resolver_manager.utils.types import NoneType
+from knot_resolver_manager.utils.modeling.types import NoneType
from .datamodel import KresConfig
from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.datamodel.management_schema import ManagementSchema
-from knot_resolver_manager.exceptions import (
- CancelStartupExecInsteadException,
- DataException,
- KresManagerException,
- SchemaException,
-)
+from knot_resolver_manager.exceptions import CancelStartupExecInsteadException, KresManagerException
from knot_resolver_manager.kresd_controller import get_best_controller_implementation
from knot_resolver_manager.utils.async_utils import readfile
from knot_resolver_manager.utils.functional import Result
-from knot_resolver_manager.utils.parsing import ParsedTree, parse, parse_yaml
+from knot_resolver_manager.utils.modeling import ParsedTree, parse, parse_yaml
+from knot_resolver_manager.utils.modeling.exceptions import DataParsingError, DataValidationError
+from knot_resolver_manager.utils.modeling.types import NoneType
from knot_resolver_manager.utils.systemd_notify import systemd_notify
-from knot_resolver_manager.utils.types import NoneType
from .kres_manager import KresManager
try:
return await handler(request)
+ except DataValidationError as e:
+ return web.Response(text=f"validation of configuration failed: {e}", status=HTTPStatus.BAD_REQUEST)
except KresManagerException as e:
- if isinstance(e, (SchemaException, DataException)):
- return web.Response(text=f"validation of configuration failed: {e}", status=HTTPStatus.BAD_REQUEST)
- else:
- logger.error("Request processing failed", exc_info=True)
- return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
+ logger.error("Request processing failed", exc_info=True)
+ return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
class Server:
else:
try:
data = await readfile(self._config_path)
- config = KresConfig(parse_yaml(data))
+ config = KresConfig(ParsedTree(data))
await self.config_store.update(config)
logger.info("Configuration file successfully reloaded")
except FileNotFoundError:
" Something must have happened to it while we were running."
)
logger.error("Configuration have NOT been changed.")
- except SchemaException as e:
+ except (DataParsingError, DataValidationError) as e:
logger.error(f"Failed to parse the updated configuration file: {e}")
logger.error("Configuration have NOT been changed.")
except KresManagerException as e:
from typing import Any, Callable, Optional, Type, TypeVar
-from .custom_types import CustomValueType
-from .modelling import SchemaNode
-
T = TypeVar("T")
Function, which consumes its argument doing absolutely nothing with it. Useful
for convincing pylint, that we need the variable even when its unused.
"""
-
-
-__all__ = [
- "CustomValueType",
- "SchemaNode",
-]
--- /dev/null
+from .custom_value_type import CustomValueType
+from .parsed_tree import ParsedTree, parse, parse_json, parse_yaml
+from .schema_node import SchemaNode
+
+__all__ = [
+ "CustomValueType",
+ "SchemaNode",
+ "ParsedTree",
+ "parse",
+ "parse_yaml",
+ "parse_json",
+]
There is no validation done on the wrapped value. The only condition is that
it can't be `None`. If you want to perform any validation during creation,
- raise a `SchemaException` in case of errors.
+ raise a `DataValidationError` in case of errors.
"""
def __init__(self, source_value: Any, object_path: str = "/") -> None:
--- /dev/null
+from typing import Iterable, List
+
+
+class DataModelingBaseException(Exception):
+ """
+ Base class for all exceptions used in modelling.
+ """
+
+
+class DataParsingError(DataModelingBaseException):
+ pass
+
+
+class DataDescriptionError(DataModelingBaseException):
+ pass
+
+
+class DataValidationError(DataModelingBaseException):
+ def __init__(self, msg: str, tree_path: str, child_exceptions: "Iterable[DataValidationError]" = tuple()) -> None:
+ super().__init__(msg)
+ self._tree_path = tree_path
+ self._child_exceptions = child_exceptions
+
+ def where(self) -> str:
+ return self._tree_path
+
+ def msg(self):
+ return f"[{self.where()}] " + super().__str__()
+
+ def recursive_msg(self, indentation_level: int = 0) -> str:
+ INDENT = indentation_level * "\t"
+ msg_parts: List[str] = [f"{INDENT}{self.msg()}"]
+ for c in self._child_exceptions:
+ msg_parts.append(c.recursive_msg(indentation_level + 1))
+ return "\n".join(msg_parts)
+
+ def __str__(self) -> str:
+ return self.recursive_msg()
+
+
+class AggregateDataValidationError(DataValidationError):
+ def __init__(self, object_path: str, child_exceptions: "Iterable[DataValidationError]") -> None:
+ super().__init__("error due to lower level exceptions", object_path, child_exceptions)
+
+ def recursive_msg(self, indentation_level: int = 0) -> str:
+ inc = 0
+ msg_parts: List[str] = []
+ if indentation_level == 0:
+ inc = 1
+ msg_parts.append("multiple configuration errors detected:")
+
+ for c in self._child_exceptions:
+ msg_parts.append(c.recursive_msg(indentation_level + inc))
+ return "\n".join(msg_parts)
from yaml.constructor import ConstructorError
from yaml.nodes import MappingNode
-from knot_resolver_manager.exceptions import DataException, ParsingException
-from knot_resolver_manager.utils.types import is_internal_field_name
+from .exceptions import DataParsingError
+from .types import is_internal_field_name
class ParsedTree:
# prepare and validate the path object
path = path[:-1] if path.endswith("/") else path
if re.match(ParsedTree._SUBTREE_MUTATION_PATH_PATTERN, path) is None:
- raise ParsingException("Provided object path for mutation is invalid.")
+ raise DataParsingError("Provided object path for mutation is invalid.")
if "_" in path:
- raise ParsingException("Provided object path contains character '_', which is illegal")
+ raise DataParsingError("Provided object path contains character '_', which is illegal")
# Note: mutation happens on the internal dict only, therefore we are working with external
# naming only. That means, there are '-' in between words.
path = path[1:] if path.startswith("/") else path
assert isinstance(obj, dict)
if segment == "":
- raise ParsingException(f"Unexpectedly empty segment in path '{path}'")
+ raise DataParsingError(f"Unexpectedly empty segment in path '{path}'")
elif is_internal_field_name(segment):
- raise ParsingException(
+ raise DataParsingError(
"No, changing internal fields (starting with _) is not allowed. Nice try though."
)
elif segment in obj:
dict_out: Dict[Any, Any] = {}
for key, val in pairs:
if key in dict_out:
- raise DataException(f"Duplicate attribute key detected: {key}")
+ raise DataParsingError(f"Duplicate attribute key detected: {key}")
dict_out[key] = val
return dict_out
# check for duplicate keys
if key in mapping:
- raise DataException(f"duplicate key detected: {key_node.start_mark}")
+ raise DataParsingError(f"duplicate key detected: {key_node.start_mark}")
value = self.construct_object(value_node, deep=deep) # type: ignore
mapping[key] = value
return mapping
"text/vnd.yaml": _Format.YAML,
}
if mime_type not in formats:
- raise DataException("Unsupported MIME type")
+ raise DataParsingError("Unsupported MIME type")
return formats[mime_type]
import yaml
-from knot_resolver_manager.exceptions import AggregateSchemaException, DataException, SchemaException
-from knot_resolver_manager.utils.custom_types import CustomValueType
from knot_resolver_manager.utils.functional import all_matches
-from knot_resolver_manager.utils.parsing import ParsedTree
-from knot_resolver_manager.utils.types import (
+
+from .custom_value_type import CustomValueType
+from .exceptions import AggregateDataValidationError, DataDescriptionError, DataValidationError
+from .parsed_tree import ParsedTree
+from .types import (
NoneType,
get_generic_type_argument,
get_generic_type_arguments,
# description
if attribute_documentation is not None:
if field_name not in attribute_documentation:
- raise SchemaException(f"The docstring does not describe field '{field_name}'", str(typ))
+ raise DataDescriptionError(f"The docstring does not describe field '{field_name}'", str(typ))
schema[name]["description"] = attribute_documentation[field_name]
del attribute_documentation[field_name]
schema[name]["default"] = Serializable.serialize(getattr(typ, field_name))
if attribute_documentation is not None and len(attribute_documentation) > 0:
- raise SchemaException(
+ raise DataDescriptionError(
f"The docstring describes attributes which are not present - {tuple(attribute_documentation.keys())}",
str(typ),
)
def _validated_tuple(cls: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
types = get_generic_type_arguments(cls)
- errs: List[SchemaException] = []
+ errs: List[DataValidationError] = []
res: List[Any] = []
for i, (tp, val) in enumerate(zip(types, obj)):
try:
res.append(_validated_object_type(tp, val, object_path=f"{object_path}[{i}]"))
- except SchemaException as e:
+ except DataValidationError as e:
errs.append(e)
if len(errs) == 1:
raise errs[0]
elif len(errs) > 1:
- raise AggregateSchemaException(object_path, child_exceptions=errs)
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
return tuple(res)
def _validated_dict(cls: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
key_type, val_type = get_generic_type_arguments(cls)
try:
- errs: List[SchemaException] = []
+ errs: List[DataValidationError] = []
res: Dict[Any, Any] = {}
for key, val in obj.items():
try:
nkey = _validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
nval = _validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
res[nkey] = nval
- except SchemaException as e:
+ except DataValidationError as e:
errs.append(e)
if len(errs) == 1:
raise errs[0]
elif len(errs) > 1:
- raise AggregateSchemaException(object_path, child_exceptions=errs)
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
except AttributeError as e:
- raise SchemaException(
+ raise DataValidationError(
f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
) from e
def _validated_list(cls: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
inner_type = get_generic_type_argument(cls)
- errs: List[SchemaException] = []
+ errs: List[DataValidationError] = []
res: List[Any] = []
for i, val in enumerate(obj):
try:
res.append(_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
- except SchemaException as e:
+ except DataValidationError as e:
errs.append(e)
if len(errs) == 1:
raise errs[0]
elif len(errs) > 1:
- raise AggregateSchemaException(object_path, child_exceptions=errs)
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
if obj is None:
return None
else:
- raise SchemaException(f"expected None, found '{obj}'.", object_path)
+ raise DataValidationError(f"expected None, found '{obj}'.", object_path)
# Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
elif is_optional(cls):
# Union[*variants]
elif is_union(cls):
variants = get_generic_type_arguments(cls)
- errs: List[SchemaException] = []
+ errs: List[DataValidationError] = []
for v in variants:
try:
return _validated_object_type(v, obj, object_path=object_path)
- except SchemaException as e:
+ except DataValidationError as e:
errs.append(e)
- raise SchemaException("could not parse any of the possible variants", object_path, child_exceptions=errs)
+ raise DataValidationError("could not parse any of the possible variants", object_path, child_exceptions=errs)
# after this, there is no place for a None object
elif obj is None:
- raise SchemaException(f"unexpected value 'None' for type {cls}", object_path)
+ raise DataValidationError(f"unexpected value 'None' for type {cls}", object_path)
# int
elif cls == int:
# except for CustomValueType class instances
if is_obj_type(obj, int) or isinstance(obj, CustomValueType):
return int(obj)
- raise SchemaException(f"expected int, found {type(obj)}", object_path)
+ raise DataValidationError(f"expected int, found {type(obj)}", object_path)
# str
elif cls == str:
if is_obj_type(obj, (str, float, int)) or isinstance(obj, CustomValueType):
return str(obj)
elif is_obj_type(obj, bool):
- raise SchemaException(
+ raise DataValidationError(
"Expected str, found bool. Be careful, that YAML parsers consider even"
' "no" and "yes" as a bool. Search for the Norway Problem for more'
" details. And please use quotes explicitly.",
object_path,
)
else:
- raise SchemaException(
+ raise DataValidationError(
f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
)
if is_obj_type(obj, bool):
return obj
else:
- raise SchemaException(f"expected bool, found {type(obj)}", object_path)
+ raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
# float
elif cls == float:
if obj in expected:
return obj
else:
- raise SchemaException(f"'{obj}' does not match any of the expected values {expected}", object_path)
+ raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
# Dict[K,V]
elif is_dict(cls):
if isinstance(obj, cls):
return obj
else:
- raise SchemaException(f"unexpected value '{obj}' for enum '{cls}'", object_path)
+ raise DataValidationError(f"unexpected value '{obj}' for enum '{cls}'", object_path)
# List[T]
elif is_list(cls):
if isinstance(obj, str):
- raise SchemaException("expected list, got string", object_path)
+ raise DataValidationError("expected list, got string", object_path)
return _validated_list(cls, obj, object_path)
# Tuple[A,B,C,D,...]
# because we can construct a DataParser from it
if isinstance(obj, (dict, SchemaNode)):
return cls(obj, object_path=object_path) # type: ignore
- raise SchemaException(f"expected 'dict' or 'SchemaNode' object, found '{type(obj)}'", object_path)
+ raise DataValidationError(f"expected 'dict' or 'SchemaNode' object, found '{type(obj)}'", object_path)
# if the object matches, just pass it through
elif inspect.isclass(cls) and isinstance(obj, cls):
# default error handler
else:
- raise SchemaException(
+ raise DataValidationError(
f"Type {cls} cannot be parsed. This is a implementation error. "
"Please fix your types in the class or improve the parser/validator.",
object_path,
"""
cls = self.__class__
annot = cls.__dict__.get("__annotations__", {})
- errs: List[SchemaException] = []
+ errs: List[DataValidationError] = []
used_keys: Set[str] = set()
for name, python_type in annot.items():
# we expected a value but it was not there
else:
- errs.append(SchemaException(f"missing attribute '{name}'.", object_path))
- except SchemaException as e:
+ errs.append(DataValidationError(f"missing attribute '{name}'.", object_path))
+ except DataValidationError as e:
errs.append(e)
if len(errs) == 1:
raise errs[0]
elif len(errs) > 1:
- raise AggregateSchemaException(object_path, errs)
+ raise AggregateDataValidationError(object_path, errs)
return used_keys
def __init__(self, source: TSource = None, object_path: str = ""):
unused = source.keys() - used_keys
if len(unused) > 0:
keys = ", ".join((f"'{u}'" for u in unused))
- raise SchemaException(
+ raise DataValidationError(
f"unexpected extra key(s) {keys}",
object_path,
)
try:
self._validate()
except ValueError as e:
- raise SchemaException(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
+ raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
def get_unparsed_data(self) -> ParsedTree:
if isinstance(self._source, SchemaNode):
return func(_create_untouchable("self"), source)
else:
raise RuntimeError("Transformation function has wrong number of arguments")
- except (ValueError, DataException) as e:
+ except ValueError as e:
if len(e.args) > 0 and isinstance(e.args[0], str):
msg = e.args[0]
else:
msg = "Failed to validate value type"
- raise SchemaException(msg, object_path) from e
+ raise DataValidationError(msg, object_path) from e
def __getitem__(self, key: str) -> Any:
if not hasattr(self, key):
'knot_resolver_manager.kresd_controller',
'knot_resolver_manager.kresd_controller.supervisord',
'knot_resolver_manager.kresd_controller.supervisord.plugin',
- 'knot_resolver_manager.utils']
+ 'knot_resolver_manager.utils',
+ 'knot_resolver_manager.utils.modeling']
package_data = \
{'': ['*'],
from pytest import raises
from knot_resolver_manager.datamodel.lua_schema import LuaSchema
-from knot_resolver_manager.exceptions import KresManagerException
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
def test_invalid():
- with raises(KresManagerException):
+ with raises(DataValidationError):
LuaSchema({"script": "-- lua script", "script-file": "path/to/file"})
import pytest
from knot_resolver_manager.datamodel.management_schema import ManagementSchema
-from knot_resolver_manager.exceptions import KresManagerException
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
@pytest.mark.parametrize("val", [{"interface": "::1@53"}, {"unix-socket": "/path/socket"}])
@pytest.mark.parametrize("val", [None, {"interface": "::1@53", "unix-socket": "/path/socket"}])
def test_management_invalid(val: Optional[Dict[str, Any]]):
- with pytest.raises(KresManagerException):
+ with pytest.raises(DataValidationError):
ManagementSchema(val)
from knot_resolver_manager.datamodel.network_schema import ListenSchema, NetworkSchema
from knot_resolver_manager.datamodel.types import PortNumber
from knot_resolver_manager.datamodel.types.types import InterfaceOptionalPort
-from knot_resolver_manager.exceptions import KresManagerException
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
def test_listen_defaults():
],
)
def test_listen_invalid(listen: Dict[str, Any]):
- with raises(KresManagerException):
+ with raises(DataValidationError):
ListenSchema(listen)
from knot_resolver_manager.datamodel.policy_schema import ActionSchema, PolicySchema
from knot_resolver_manager.datamodel.types import PolicyActionEnum
-from knot_resolver_manager.exceptions import KresManagerException
-from knot_resolver_manager.utils.types import get_generic_type_arguments
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
+from knot_resolver_manager.utils.modeling.types import get_generic_type_arguments
noconfig_actions = [
"pass",
@pytest.mark.parametrize("val", [{"action": "invalid-action"}])
def test_action_invalid(val: Dict[str, Any]):
- with raises(KresManagerException):
+ with raises(DataValidationError):
PolicySchema(val)
- with raises(KresManagerException):
+ with raises(DataValidationError):
ActionSchema(val)
],
)
def test_policy_invalid(val: Dict[str, Any]):
- with raises(KresManagerException):
+ with raises(DataValidationError):
PolicySchema(val)
- with raises(KresManagerException):
+ with raises(DataValidationError):
ActionSchema(val)
noconfig_actions,
)
def test_policy_message_invalid(val: str):
- with raises(KresManagerException):
+ with raises(DataValidationError):
PolicySchema({"action": f"{val}", "message": "this is deny message"})
- with raises(KresManagerException):
+ with raises(DataValidationError):
ActionSchema({"action": f"{val}", "message": "this is deny message"})
from pytest import raises
from knot_resolver_manager.datamodel.rpz_schema import RPZSchema
-from knot_resolver_manager.exceptions import KresManagerException
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
@pytest.mark.parametrize(
],
)
def test_message_invalid(val: str):
- with raises(KresManagerException):
+ with raises(DataValidationError):
RPZSchema({"action": f"{val}", "file": "whitelist.rpz", "message": "this is deny message"})
SizeUnit,
TimeUnit,
)
-from knot_resolver_manager.exceptions import KresManagerException
-from knot_resolver_manager.utils import SchemaNode
+from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
def _rand_domain(label_chars: int, levels: int = 1) -> str:
@pytest.mark.parametrize("val", [0, 65_636, -1, "53"])
def test_port_number_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
PortNumber(val)
@pytest.mark.parametrize("val", ["-5B", 5, -5242880, "45745mB"])
def test_size_unit_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
SizeUnit(val)
@pytest.mark.parametrize("val", ["-1", "-24h", "1440mm", 6575, -1440])
def test_time_unit_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
TimeUnit("-1")
],
)
def test_domain_name_invalid(val: str):
- with raises(KresManagerException):
+ with raises(DataValidationError):
DomainName(val)
@pytest.mark.parametrize("val", ["_lo", "-wlo1", "lo_", "wlo1-", "e8--2", "web__ifgrp"])
def test_interface_name_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
InterfaceName(val)
@pytest.mark.parametrize("val", ["lo", "2001:db8::1000", "53"])
def test_interface_port_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
InterfacePort(val)
@pytest.mark.parametrize("val", ["lo@", "@53"])
def test_interface_optional_port_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
InterfaceOptionalPort(val)
"val", ["123.4.5.6", "2001:db8::1000", "123.4.5.6.7@5000", "2001:db8::10000@5001", "123.4.5.6@"]
)
def test_ip_address_port_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
IPAddressPort(val)
@pytest.mark.parametrize("val", ["123.4.5.6.7", "2001:db8::10000", "123.4.5.6@", "@55"])
def test_ip_address_optional_port_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
IPAddressOptionalPort(val)
@pytest.mark.parametrize("val", ["123456", "2001:db8::1000"])
def test_ipv4_address_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
IPv4Address(val)
@pytest.mark.parametrize("val", ["123.4.5.6", "2001::db8::1000"])
def test_ipv6_address_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
IPv6Address(val)
@pytest.mark.parametrize("val", ["10.11.12.13/8", "10.11.12.5/128"])
def test_ip_network_invalid(val: str):
- with raises(KresManagerException):
+ with raises(DataValidationError):
IPNetwork(val)
@pytest.mark.parametrize("val", ["fe80::/95", "10.11.12.3/96", "64:ff9b::1/96"])
def test_ipv6_96_network_invalid(val: Any):
- with raises(KresManagerException):
+ with raises(DataValidationError):
IPv6Network96(val)
from pytest import raises
from typing_extensions import Literal
-from knot_resolver_manager.exceptions import SchemaException
-from knot_resolver_manager.utils import SchemaNode
-from knot_resolver_manager.utils.parsing import parse_json, parse_yaml
+from knot_resolver_manager.utils.modeling import SchemaNode, parse_json, parse_yaml
+from knot_resolver_manager.utils.modeling.exceptions import DataDescriptionError, DataValidationError
class _TestBool(SchemaNode):
@pytest.mark.parametrize("val", ["0", "1", "5", "'true'", "'false'", "5.5"]) # int, str, float
def test_parsing_bool_invalid(val: str):
- with raises(SchemaException):
+ with raises(DataValidationError):
_TestBool(parse_yaml(f"v: {val}"))
@pytest.mark.parametrize("val", ["false", "'5'", "5.5"]) # bool, str, float
def test_parsing_int_invalid(val: str):
- with raises(SchemaException):
+ with raises(DataValidationError):
_TestInt(parse_yaml(f"v: {val}"))
def test_parsing_str_invalid():
- with raises(SchemaException):
+ with raises(DataValidationError):
_TestStr(parse_yaml("v: false")) # bool
assert o.workers == 8
assert o.inner.size == 33
- # raise validation SchemaException
- with raises(SchemaException):
+ # raise validation DataValidationError
+ with raises(DataValidationError):
o = ConfSchema(d.update("/", parse_json('{"workers": -5}')))
nothing: str
- with raises(SchemaException):
+ with raises(DataDescriptionError):
_ = AdditionalItem.json_schema()
class WrongDescription(SchemaNode):
nothing: str
- with raises(SchemaException):
+ with raises(DataDescriptionError):
_ = WrongDescription.json_schema()
import pytest
from typing_extensions import Literal
-from knot_resolver_manager.utils.modelling import SchemaNode
-from knot_resolver_manager.utils.types import is_list, is_literal
+from knot_resolver_manager.utils.modeling import SchemaNode
+from knot_resolver_manager.utils.modeling.types import is_list, is_literal
types = [
bool,