+from typing import Iterable, List
+
+
class KresManagerException(Exception):
"""
Base class for all custom exceptions we use in our code
pass
-class TreeException(KresManagerException):
- def __init__(self, msg: str, tree_path: str) -> None:
+class SchemaException(KresManagerException):
+ def __init__(self, msg: str, tree_path: str, child_exceptions: "Iterable[SchemaException]" = tuple()) -> None:
super().__init__(msg)
self._tree_path = tree_path
+ self._child_exceptions = child_exceptions
def where(self) -> str:
return self._tree_path
+ def msg(self):
+ return f"field {self.where()}: " + super().__str__()
+
+ def recursive_msg(self, indentation_level: int = 0) -> str:
+ INDENT = indentation_level * "\t"
+ msg_parts: List[str] = [f"{INDENT}{self.msg()}"]
+ for c in self._child_exceptions:
+ msg_parts.append(c.recursive_msg(indentation_level + 1))
+ return "\n".join(msg_parts)
+
def __str__(self) -> str:
- return f"configuration field {self.where()}: " + super().__str__()
+ return self.recursive_msg()
-class SchemaException(TreeException):
- pass
+class AggregateSchemaException(SchemaException):
+ def __init__(self, object_path: str, child_exceptions: "Iterable[SchemaException]") -> None:
+ super().__init__("error due to lower level exceptions", object_path, child_exceptions)
+
+ def recursive_msg(self, indentation_level: int = 0) -> str:
+ inc = 0
+ msg_parts: List[str] = []
+ if indentation_level == 0:
+ inc = 1
+ msg_parts.append("multiple configuration errors detected:")
+
+ for c in self._child_exceptions:
+ msg_parts.append(c.recursive_msg(indentation_level + inc))
+ return "\n".join(msg_parts)
class DataException(KresManagerException):
import yaml
-from knot_resolver_manager.exceptions import DataException, SchemaException
+from knot_resolver_manager.exceptions import AggregateSchemaException, DataException, SchemaException
from knot_resolver_manager.utils.custom_types import CustomValueType
from knot_resolver_manager.utils.functional import all_matches
from knot_resolver_manager.utils.parsing import ParsedTree
raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
+def _validated_tuple(cls: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
+ types = get_generic_type_arguments(cls)
+ errs: List[SchemaException] = []
+ res: List[Any] = []
+ for i, (tp, val) in enumerate(zip(types, obj)):
+ try:
+ res.append(_validated_object_type(tp, val, object_path=f"{object_path}[{i}]"))
+ except SchemaException as e:
+ errs.append(e)
+ if len(errs) > 0:
+ raise AggregateSchemaException(object_path, child_exceptions=errs)
+ return tuple(res)
+
+
+def _validated_dict(cls: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
+ key_type, val_type = get_generic_type_arguments(cls)
+ try:
+ errs: List[SchemaException] = []
+ res: Dict[Any, Any] = {}
+ for key, val in obj.items():
+ try:
+ nkey = _validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
+ nval = _validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
+ res[nkey] = nval
+ except SchemaException as e:
+ errs.append(e)
+ if len(errs) > 0:
+ raise AggregateSchemaException(object_path, child_exceptions=errs)
+ return res
+ except AttributeError as e:
+ raise SchemaException(
+ f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
+ ) from e
+
+
+def _validated_list(cls: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
+ inner_type = get_generic_type_argument(cls)
+ errs: List[SchemaException] = []
+ res: List[Any] = []
+ for i, val in enumerate(obj):
+ try:
+ res.append(_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
+ except SchemaException as e:
+ errs.append(e)
+ if len(errs) > 0:
+ raise AggregateSchemaException(object_path, child_exceptions=errs)
+ return res
+
+
def _validated_object_type(
cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False, object_path: str = "/"
) -> Any:
except SchemaException as e:
errs.append(e)
- err_string = "\n\t- ".join([str(e) for e in errs])
- raise SchemaException(f"failed to parse union type, all variants failed:\n\t- {err_string}", object_path)
+ raise SchemaException("failed to parse union type, all variants failed", object_path, child_exceptions=errs)
# after this, there is no place for a None object
elif obj is None:
# Dict[K,V]
elif is_dict(cls):
- key_type, val_type = get_generic_type_arguments(cls)
- try:
- return {
- _validated_object_type(key_type, key, object_path=f"{object_path} @ key {key}"): _validated_object_type(
- val_type, val, object_path=f"{object_path} @ value for key {key}"
- )
- for key, val in obj.items()
- }
- except AttributeError as e:
- raise SchemaException(
- f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
- ) from e
+ return _validated_dict(cls, obj, object_path)
# any Enums (probably used only internally in DataValidator)
elif is_enum(cls):
# List[T]
elif is_list(cls):
- inner_type = get_generic_type_argument(cls)
- return [_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]") for i, val in enumerate(obj)]
+ return _validated_list(cls, obj, object_path)
# Tuple[A,B,C,D,...]
elif is_tuple(cls):
- types = get_generic_type_arguments(cls)
- return tuple(_validated_object_type(typ, val, object_path=object_path) for typ, val in zip(types, obj))
+ return _validated_tuple(cls, obj, object_path)
# type of obj and cls type match
elif is_obj_type(obj, cls):
"""
cls = self.__class__
annot = cls.__dict__.get("__annotations__", {})
+ errs: List[SchemaException] = []
used_keys: Set[str] = set()
for name, python_type in annot.items():
- if is_internal_field_name(name):
- continue
-
- # populate field
- if source is None:
- self._assign_default(name, python_type, object_path)
-
- # check for invalid configuration with both transformation function and default value
- elif hasattr(self, f"_{name}") and hasattr(self, name):
- raise RuntimeError(
- f"Field '{self.__class__.__name__}.{name}' has default value and transformation function at"
- " the same time. That is now allowed. Store the default in the transformation function."
- )
-
- # there is a transformation function to create the value
- elif hasattr(self, f"_{name}") and callable(getattr(self, f"_{name}")):
- val = self._get_converted_value(name, source, object_path)
- self._assign_field(name, python_type, val, object_path)
- used_keys.add(name)
-
- # source just contains the value
- elif name in source:
- val = source[name]
- self._assign_field(name, python_type, val, object_path)
- used_keys.add(name)
-
- # there is a default value, or the type is optional => store the default or null
- elif hasattr(self, name) or is_optional(python_type):
- self._assign_default(name, python_type, object_path)
-
- # we expected a value but it was not there
- else:
- raise SchemaException(f"Missing attribute '{name}'.", object_path)
+ try:
+ if is_internal_field_name(name):
+ continue
+
+ # populate field
+ if source is None:
+ self._assign_default(name, python_type, object_path)
+
+ # check for invalid configuration with both transformation function and default value
+ elif hasattr(self, f"_{name}") and hasattr(self, name):
+ raise RuntimeError(
+ f"Field '{self.__class__.__name__}.{name}' has default value and transformation function at"
+ " the same time. That is now allowed. Store the default in the transformation function."
+ )
+
+ # there is a transformation function to create the value
+ elif hasattr(self, f"_{name}") and callable(getattr(self, f"_{name}")):
+ val = self._get_converted_value(name, source, object_path)
+ self._assign_field(name, python_type, val, object_path)
+ used_keys.add(name)
+
+ # source just contains the value
+ elif name in source:
+ val = source[name]
+ self._assign_field(name, python_type, val, object_path)
+ used_keys.add(name)
+
+ # there is a default value, or the type is optional => store the default or null
+ elif hasattr(self, name) or is_optional(python_type):
+ self._assign_default(name, python_type, object_path)
+
+ # we expected a value but it was not there
+ else:
+ errs.append(SchemaException(f"missing attribute '{name}'.", object_path))
+ except SchemaException as e:
+ errs.append(e)
+ if len(errs) > 0:
+ raise AggregateSchemaException(object_path, errs)
return used_keys
def __init__(self, source: TSource = None, object_path: str = ""):