from .exceptions import AggregateDataValidationError, DataDescriptionError, DataValidationError
from .renaming import Renamed, renamed
from .types import (
- NoneType,
get_generic_type_argument,
get_generic_type_arguments,
get_optional_inner_type,
is_union,
)
-
T = TypeVar("T")
raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
-TSource = Union[None, "NoRenameBaseSchema", Dict[str, Any]]
+TSource = Union[None, "BaseSchema", Dict[str, Any]]
def _create_untouchable(name: str) -> object:
return _Untouchable()
-class Mapper:
- def _validated_tuple(self, tp: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
+class ObjectMapper:
+ def _create_tuple(self, tp: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
types = get_generic_type_arguments(tp)
errs: List[DataValidationError] = []
res: List[Any] = []
for i, (t, val) in enumerate(zip(types, obj)):
try:
- res.append(self.validated_object_type(t, val, object_path=f"{object_path}[{i}]"))
+ res.append(self.map_object(t, val, object_path=f"{object_path}[{i}]"))
except DataValidationError as e:
errs.append(e)
if len(errs) == 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return tuple(res)
- def _validated_dict(self, tp: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
+ def _create_dict(self, tp: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
key_type, val_type = get_generic_type_arguments(tp)
try:
errs: List[DataValidationError] = []
res: Dict[Any, Any] = {}
for key, val in obj.items():
try:
- nkey = self.validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
- nval = self.validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
+ nkey = self.map_object(key_type, key, object_path=f"{object_path}[{key}]")
+ nval = self.map_object(val_type, val, object_path=f"{object_path}[{key}]")
res[nkey] = nval
except DataValidationError as e:
errs.append(e)
f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
) from e
- def _validated_list(self, tp: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
+ def _create_list(self, tp: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
+ if isinstance(obj, str):
+ raise DataValidationError("expected list, got string", object_path)
+
inner_type = get_generic_type_argument(tp)
errs: List[DataValidationError] = []
res: List[Any] = []
for i, val in enumerate(obj):
try:
- res.append(self.validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
+ res.append(self.map_object(inner_type, val, object_path=f"{object_path}[{i}]"))
except DataValidationError as e:
errs.append(e)
if len(errs) == 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
- def validated_object_type(
+ def _create_str(self, obj: Any, object_path: str) -> str:
+ # we are willing to cast any primitive value to string, but no compound values are allowed
+ if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
+ return str(obj)
+ elif is_obj_type(obj, bool):
+ raise DataValidationError(
+ "Expected str, found bool. Be careful, that YAML parsers consider even"
+ ' "no" and "yes" as a bool. Search for the Norway Problem for more'
+ " details. And please use quotes explicitly.",
+ object_path,
+ )
+ else:
+ raise DataValidationError(
+ f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
+ )
+
+ def _create_int(self, obj: Any, object_path: str) -> int:
+ # we don't want to make an int out of anything else than other int
+ # except for BaseValueType class instances
+ if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
+ return int(obj)
+ raise DataValidationError(f"expected int, found {type(obj)}", object_path)
+
+ def _create_union(self, tp: Type[T], obj: Any, object_path: str) -> T:
+ variants = get_generic_type_arguments(tp)
+ errs: List[DataValidationError] = []
+ for v in variants:
+ try:
+ return self.map_object(v, obj, object_path=object_path)
+ except DataValidationError as e:
+ errs.append(e)
+
+ raise DataValidationError("could not parse any of the possible variants", object_path, child_exceptions=errs)
+
+ def _create_optional(self, tp: Type[Optional[T]], obj: Any, object_path: str) -> Optional[T]:
+ inner: Type[Any] = get_optional_inner_type(tp)
+ if obj is None:
+ return None
+ else:
+ return self.map_object(inner, obj, object_path=object_path)
+
+ def _create_bool(self, obj: Any, object_path: str) -> bool:
+ if is_obj_type(obj, bool):
+ return obj
+ else:
+ raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+
+ def _create_literal(self, tp: Type[Any], obj: Any, object_path: str) -> Any:
+ expected = get_generic_type_arguments(tp)
+ if obj in expected:
+ return obj
+ else:
+ raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+
+ def _create_base_schema_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseSchema":
+ if isinstance(obj, (dict, BaseSchema)):
+ return tp(obj, object_path=object_path)
+ raise DataValidationError(f"expected 'dict' or 'NoRenameBaseSchema' object, found '{type(obj)}'", object_path)
+
+ def create_value_type_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseValueType":
+ if isinstance(obj, tp):
+ # if we already have a custom value type, just pass it through
+ return obj
+ else:
+ # no validation performed, the implementation does it in the constuctor
+ try:
+ return tp(obj, object_path=object_path)
+ except ValueError as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = f"Failed to validate value against {tp} type"
+ raise DataValidationError(msg, object_path) from e
+
+ def map_object(
self,
tp: Type[Any],
obj: Any,
object_path: str = "/",
) -> Any:
"""
- Given an expected type `cls` and a value object `obj`, validate the type of `obj` and return it
+ Given an expected type `cls` and a value object `obj`, return a new object of the given type and map fields of `obj` into it. During the mapping procedure,
+ runtime type checking is performed.
"""
# Disabling these checks, because I think it's much more readable as a single function
# Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
elif is_optional(tp):
- inner: Type[Any] = get_optional_inner_type(tp)
- if obj is None:
- return None
- else:
- return self.validated_object_type(inner, obj, object_path=object_path)
+ return self._create_optional(tp, obj, object_path)
# Union[*variants]
elif is_union(tp):
- variants = get_generic_type_arguments(tp)
- errs: List[DataValidationError] = []
- for v in variants:
- try:
- return self.validated_object_type(v, obj, object_path=object_path)
- except DataValidationError as e:
- errs.append(e)
-
- raise DataValidationError(
- "could not parse any of the possible variants", object_path, child_exceptions=errs
- )
+ return self._create_union(tp, obj, object_path)
# after this, there is no place for a None object
elif obj is None:
# int
elif tp == int:
- # we don't want to make an int out of anything else than other int
- # except for BaseValueType class instances
- if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
- return int(obj)
- raise DataValidationError(f"expected int, found {type(obj)}", object_path)
+ return self._create_int(obj, object_path)
# str
elif tp == str:
- # we are willing to cast any primitive value to string, but no compound values are allowed
- if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
- return str(obj)
- elif is_obj_type(obj, bool):
- raise DataValidationError(
- "Expected str, found bool. Be careful, that YAML parsers consider even"
- ' "no" and "yes" as a bool. Search for the Norway Problem for more'
- " details. And please use quotes explicitly.",
- object_path,
- )
- else:
- raise DataValidationError(
- f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
- )
+ return self._create_str(obj, object_path)
# bool
elif tp == bool:
- if is_obj_type(obj, bool):
- return obj
- else:
- raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+ return self._create_bool(obj, object_path)
# float
elif tp == float:
raise NotImplementedError(
- "Floating point values are not supported in the parser."
+ "Floating point values are not supported in the object mapper."
" Please implement them and be careful with type coercions"
)
# Literal[T]
elif is_literal(tp):
- expected = get_generic_type_arguments(tp)
- if obj in expected:
- return obj
- else:
- raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+ return self._create_literal(tp, obj, object_path)
# Dict[K,V]
elif is_dict(tp):
- return self._validated_dict(tp, obj, object_path)
+ return self._create_dict(tp, obj, object_path)
# any Enums (probably used only internally in DataValidator)
elif is_enum(tp):
# List[T]
elif is_list(tp):
- if isinstance(obj, str):
- raise DataValidationError("expected list, got string", object_path)
- return self._validated_list(tp, obj, object_path)
+ return self._create_list(tp, obj, object_path)
# Tuple[A,B,C,D,...]
elif is_tuple(tp):
- return self._validated_tuple(tp, obj, object_path)
+ return self._create_tuple(tp, obj, object_path)
# type of obj and cls type match
elif is_obj_type(obj, tp):
# BaseValueType subclasses
elif inspect.isclass(tp) and issubclass(tp, BaseValueType):
- return self.construct_value_type(tp, obj, object_path)
+ return self.create_value_type_object(tp, obj, object_path)
# nested BaseSchema subclasses
- elif inspect.isclass(tp) and issubclass(tp, NoRenameBaseSchema):
- return self.construct_base_schema(tp, obj, object_path)
+ elif inspect.isclass(tp) and issubclass(tp, BaseSchema):
+ return self._create_base_schema_object(tp, obj, object_path)
# if the object matches, just pass it through
elif inspect.isclass(tp) and isinstance(obj, tp):
object_path,
)
- def construct_base_schema(self, tp: Type[Any], obj: Any, object_path: str) -> "NoRenameBaseSchema":
- if isinstance(obj, (dict, NoRenameBaseSchema)):
- return tp(obj, object_path=object_path) # type: ignore
- raise DataValidationError(
- f"expected 'dict' or 'NoRenameBaseSchema' object, found '{type(obj)}'", object_path
- )
-
- def construct_value_type(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseValueType":
- if isinstance(obj, tp):
- # if we already have a custom value type, just pass it through
- return obj
- else:
- # no validation performed, the implementation does it in the constuctor
- try:
- return tp(obj, object_path=object_path)
- except ValueError as e:
- if len(e.args) > 0 and isinstance(e.args[0], str):
- msg = e.args[0]
- else:
- msg = f"Failed to validate value against {tp} type"
- raise DataValidationError(msg, object_path) from e
-
- def load(self, clazz: Type[T], obj: Any, default: Any = ..., use_default: bool = False) -> T:
- return self.validated_object_type(clazz, obj, default, use_default)
-
- @classmethod
- def is_obj_type_valid(cls, obj: Any, tp: Type[Any]) -> bool:
+ def is_obj_type_valid(self, obj: Any, tp: Type[Any]) -> bool:
"""
Runtime type checking. Validate, that a given object is of a given type.
"""
try:
- cls().validated_object_type(tp, obj)
+ self.map_object(tp, obj)
return True
except (DataValidationError, ValueError):
return False
def _assign_default(self, obj: Any, name: str, python_type: Any, object_path: str) -> None:
cls = obj.__class__
default = getattr(cls, name, None)
- value = self.validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
+ value = self.map_object(python_type, default, object_path=f"{object_path}/{name}")
setattr(obj, name, value)
def _assign_field(self, obj: Any, name: str, python_type: Any, value: Any, object_path: str) -> None:
- value = self.validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
+ value = self.map_object(python_type, value, object_path=f"{object_path}/{name}")
setattr(obj, name, value)
- def _assign_fields(self, obj: Any, source: Union[Dict[str, Any], "NoRenameBaseSchema", None], object_path: str) -> Set[str]:
+ def _assign_fields(self, obj: Any, source: Union[Dict[str, Any], "BaseSchema", None], object_path: str) -> Set[str]:
"""
Order of assignment:
1. all direct assignments
msg = "Failed to validate value type"
raise DataValidationError(msg, object_path) from e
- def object_constructor(self, obj: Any, source: TSource, object_path: str) -> None:
- # make sure that all raw data checks passed on the source object
- if source is None:
- source = {}
+ def object_constructor(self, obj: Any, source: Union["BaseSchema", Dict[Any, Any]], object_path: str) -> None:
+ """
+ Delegated constructor for the NoRenameBaseSchema class.
- if not isinstance(source, (NoRenameBaseSchema, dict)):
- raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
+ The reason this method is delegated to the mapper is due to renaming. Like this, we don't have to
+ worry about a different BaseSchema class, when we want to have dynamically renamed fields.
+ """
+ # As this is a delegated constructor, we must ignore protected access warnings
+ # pylint: disable=protected-access
- # save source (2 underscores should invoke Python's build-in mangling and we wont hopefully have collistions with data fields)
- obj.__source: Union[Dict[str, Any], NoRenameBaseSchema] = source # type: ignore
+ # sanity check
+ if not isinstance(source, (BaseSchema, dict)): # type: ignore
+ raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
# construct lower level schema first if configured to do so
if obj._LAYER is not None:
used_keys = self._assign_fields(obj, source, object_path)
# check for unused keys in the source object
- if source and not isinstance(source, NoRenameBaseSchema):
+ if source and not isinstance(source, BaseSchema):
unused = source.keys() - used_keys
if len(unused) > 0:
keys = ", ".join((f"'{u}'" for u in unused))
raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
-class NoRenameBaseSchema(Serializable):
+class BaseSchema(Serializable):
"""
Base class for modeling configuration schema. It somewhat resembles standard dataclasses with additional
functionality:
See tests/utils/test_modelling.py for example usage.
"""
- _LAYER: Optional[Type["NoRenameBaseSchema"]] = None
- _MAPPER: Mapper = Mapper()
+ _LAYER: Optional[Type["BaseSchema"]] = None
+ _MAPPER: ObjectMapper = ObjectMapper()
+
+ def __init_subclass__(cls) -> None:
+ return super().__init_subclass__()
def __init__(self, source: TSource = None, object_path: str = ""):
+ # save source data (and drop information about nullness)
+ source = source or {}
+ self.__source: Union[Dict[str, Any], BaseSchema] = source
+
+ # delegate the rest of the constructor
self._MAPPER.object_constructor(self, source, object_path)
- self.__source: Union[Dict[str, Any], NoRenameBaseSchema]
def get_unparsed_data(self) -> Dict[str, Any]:
- if isinstance(self.__source, NoRenameBaseSchema):
+ if isinstance(self.__source, BaseSchema):
return self.__source.get_unparsed_data()
elif isinstance(self.__source, Renamed):
return self.__source.original()
return True
@classmethod
- def json_schema(cls: Type["NoRenameBaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
+ def json_schema(cls: Type["BaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
if cls._LAYER is not None:
return cls._LAYER.json_schema(include_schema_definition=include_schema_definition)
return res
-class RenamedMapper(Mapper):
- def _validated_dict(self, tp: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
+class RenamingObjectMapper(ObjectMapper):
+ """
+ Same as object mapper, but it uses collection wrappers from the module `renamed` to perform dynamic field renaming.
+
+ More specifically:
+ - it renames all properties in (nested) objects
+ - it does not rename keys in dictionaries
+ """
+
+ def _create_dict(self, tp: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
if isinstance(obj, Renamed):
obj = obj.original()
- return super()._validated_dict(tp, obj, object_path)
+ return super()._create_dict(tp, obj, object_path)
- def construct_base_schema(self, tp: Type[Any], obj: Any, object_path: str) -> "NoRenameBaseSchema":
+ def _create_base_schema_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseSchema":
if isinstance(obj, dict):
obj = renamed(obj)
- return super().construct_base_schema(tp, obj, object_path)
+ return super()._create_base_schema_object(tp, obj, object_path)
- def object_constructor(self, obj: Any, source: TSource, object_path: str) -> None:
+ def object_constructor(self, obj: Any, source: Union["BaseSchema", Dict[Any, Any]], object_path: str) -> None:
if isinstance(source, dict):
source = renamed(source)
return super().object_constructor(obj, source, object_path)
-# export as a standalone functions for backwards compatibility
-load = RenamedMapper().load
-is_obj_type_valid = RenamedMapper.is_obj_type_valid
+# export as a standalone functions for simplicity compatibility
+is_obj_type_valid = ObjectMapper().is_obj_type_valid
+map_object = ObjectMapper().map_object
-class BaseSchema(NoRenameBaseSchema):
+class ConfigSchema(BaseSchema):
"""
- In Knot Resolver Manager, we need renamed keys most of the time, as we are using the modelling
- tools mostly for configuration schema. That's why the normal looking name BaseSchema does renaming
- and NoRenameBaseSchema is the opposite.
+ Same as BaseSchema, but maps with RenamingObjectMapper
"""
- _MAPPER: Mapper = RenamedMapper()
+ _MAPPER: ObjectMapper = RenamingObjectMapper()