From: Vasek Sraier Date: Wed, 22 Sep 2021 08:46:14 +0000 (+0200) Subject: utils: modelling: self object in transformation functions cannot be accessed X-Git-Tag: v6.0.0a1~125^2~2^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6244bdd66c8afadd4a43db1c5db89350648d343d;p=thirdparty%2Fknot-resolver.git utils: modelling: self object in transformation functions cannot be accessed --- diff --git a/manager/knot_resolver_manager/exceptions.py b/manager/knot_resolver_manager/exceptions.py index e1576e910..02806fc9e 100644 --- a/manager/knot_resolver_manager/exceptions.py +++ b/manager/knot_resolver_manager/exceptions.py @@ -15,6 +15,9 @@ class TreeException(KresdManagerException): def where(self) -> str: return self._tree_path + + def __str__(self) -> str: + return super().__str__() + f" @ {self.where()}" class SchemaException(TreeException): diff --git a/manager/knot_resolver_manager/utils/modelling.py b/manager/knot_resolver_manager/utils/modelling.py index 830e05f73..860ad0266 100644 --- a/manager/knot_resolver_manager/utils/modelling.py +++ b/manager/knot_resolver_manager/utils/modelling.py @@ -63,7 +63,7 @@ def _validated_object_type( # after this, there is no place for a None object elif obj is None: - raise SchemaException(f"Unexpected None value for type {cls}", object_path) + raise SchemaException(f"Unexpected value 'None' for type {cls}", object_path) # int elif cls == int: @@ -180,35 +180,28 @@ def _validated_object_type( TSource = Union[NoneType, ParsedTree, "SchemaNode", Dict[str, Any]] +def create_untouchable(name: str): + class _Untouchable(object): + def __init__(self) -> None: + super().__init__() + def __getattribute__(self, item_name: str) -> Any: + raise RuntimeError(f"You are not supposed to access object '{name}'.") + def __setattr__(self, item_name: str, value: Any) -> None: + raise RuntimeError(f"You are not supposed to access object '{name}'.") + + return _Untouchable() class SchemaNode: _PREVIOUS_SCHEMA: Optional[Type["SchemaNode"]] = None - def _assign_default_fields(self) -> Set[str]: + def _assign_default(self, name: str, python_type: Any, object_path: str): cls = self.__class__ - annot = cls.__dict__.get("__annotations__", {}) - - used_keys: Set[str] = set() - for name in annot: - val = getattr(cls, name, ...) - if val is not ...: - setattr(self, name, val) - used_keys.add(name) - - if hasattr(self, f"_{name}"): - # check, that the schema makes sense - raise TypeError( - f"{cls.__name__}.{name}: can't have both default value and transformation function at once." - "Use _PREVIOUS_SCHEMA..." - ) - - return used_keys + default = getattr(cls, name, None) + value = _validated_object_type(python_type, default, object_path=f"{object_path}/{name}") + setattr(self, name, value) def _assign_field(self, name: str, python_type: Any, value: Any, object_path: str): - cls = self.__class__ - use_default = hasattr(cls, name) - default = getattr(cls, name, ...) - value = _validated_object_type(python_type, value, default, use_default, object_path=f"{object_path}/{name}") + value = _validated_object_type(python_type, value, object_path=f"{object_path}/{name}") setattr(self, name, value) def _assign_fields(self, source: Union[ParsedTree, "SchemaNode", NoneType], object_path: str) -> Set[str]: @@ -221,45 +214,37 @@ class SchemaNode: annot = cls.__dict__.get("__annotations__", {}) used_keys: Set[str] = set() - deffered: List[Tuple[str, Any]] = [] for name, python_type in annot.items(): if is_internal_field_name(name): continue # populate field - if not source: - self._assign_field(name, python_type, None, object_path) + if source is None: + self._assign_default(name, python_type, object_path) - # we have a way how to create the value + # there is a transformation function to create the value elif hasattr(self, f"_{name}"): - deffered.append((name, python_type)) + val = self._get_converted_value(name, source, object_path) + self._assign_field(name, python_type, val, object_path) + used_keys.add(name) # source just contains the value elif name in source: val = source[name] - used_keys.add(name) self._assign_field(name, python_type, val, object_path) + used_keys.add(name) - # there is a default value and in the source, the value is missing - elif getattr(self, name, ...) is not ...: - self._assign_field(name, python_type, None, object_path) - - # the value is optional and there is nothing - elif is_optional(python_type): - self._assign_field(name, python_type, None, object_path) + # there is a default value, or the type is optional => store the default or null + elif hasattr(self, name) or is_optional(python_type): + self._assign_default(name, python_type, object_path) # we expected a value but it was not there else: raise SchemaException(f"Missing attribute '{name}'.", object_path) - for name, python_type in deffered: - val = self._get_converted_value(name, source, object_path) - used_keys.add(name) # the field might not exist, but that won't break anything - self._assign_field(name, python_type, val, object_path) - return used_keys - def __init__(self, source: TSource = None, object_path: str = "/"): + def __init__(self, source: TSource = None, object_path: str = ""): # construct lower level schema node first if configured to do so if self._PREVIOUS_SCHEMA is not None: source = self._PREVIOUS_SCHEMA(source, object_path=object_path) # pylint: disable=not-callable @@ -269,8 +254,7 @@ class SchemaNode: source = ParsedTree(source) # assign fields - used_keys = self._assign_default_fields() - used_keys.update(self._assign_fields(source, object_path)) + used_keys = self._assign_fields(source, object_path) # check for unused keys in the source object if source and not isinstance(source, SchemaNode): @@ -287,7 +271,16 @@ class SchemaNode: def _get_converted_value(self, key: str, source: TSource, object_path: str) -> Any: try: - return getattr(self, f"_{key}")(source) + func = getattr(self.__class__, f"_{key}") + argc = len(inspect.signature(func).parameters) + if argc == 1: + # it is a static method + return func(source) + elif argc == 2: + # it is a instance method + return func(create_untouchable("self"), source) + else: + raise RuntimeError("Transformation function has wrong number of arguments") except (ValueError, DataException) as e: if len(e.args) > 0 and isinstance(e.args[0], str): msg = e.args[0]