]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
utils: modelling: self object in transformation functions cannot be accessed
authorVasek Sraier <git@vakabus.cz>
Wed, 22 Sep 2021 08:46:14 +0000 (10:46 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:53 +0000 (16:17 +0200)
manager/knot_resolver_manager/exceptions.py
manager/knot_resolver_manager/utils/modelling.py

index e1576e9107b8c19b324a940199f37c6591c9859f..02806fc9edbf015dcae268e1b948fb67e3b5696d 100644 (file)
@@ -15,6 +15,9 @@ class TreeException(KresdManagerException):
 
     def where(self) -> str:
         return self._tree_path
+    
+    def __str__(self) -> str:
+        return super().__str__() + f" @ {self.where()}"
 
 
 class SchemaException(TreeException):
index 830e05f733c915ecd1b2f705a1ae327eee26a50a..860ad02664543bb65aebbd5e5ee8f755430b38cb 100644 (file)
@@ -63,7 +63,7 @@ def _validated_object_type(
 
     # after this, there is no place for a None object
     elif obj is None:
-        raise SchemaException(f"Unexpected None value for type {cls}", object_path)
+        raise SchemaException(f"Unexpected value 'None' for type {cls}", object_path)
 
     # int
     elif cls == int:
@@ -180,35 +180,28 @@ def _validated_object_type(
 
 TSource = Union[NoneType, ParsedTree, "SchemaNode", Dict[str, Any]]
 
+def create_untouchable(name: str):
+    class _Untouchable(object):
+        def __init__(self) -> None:
+            super().__init__()
+        def __getattribute__(self, item_name: str) -> Any:
+            raise RuntimeError(f"You are not supposed to access object '{name}'.")
+        def __setattr__(self, item_name: str, value: Any) -> None:
+            raise RuntimeError(f"You are not supposed to access object '{name}'.")
+        
+    return _Untouchable()
 
 class SchemaNode:
     _PREVIOUS_SCHEMA: Optional[Type["SchemaNode"]] = None
 
-    def _assign_default_fields(self) -> Set[str]:
+    def _assign_default(self, name: str, python_type: Any, object_path: str):
         cls = self.__class__
-        annot = cls.__dict__.get("__annotations__", {})
-
-        used_keys: Set[str] = set()
-        for name in annot:
-            val = getattr(cls, name, ...)
-            if val is not ...:
-                setattr(self, name, val)
-                used_keys.add(name)
-
-                if hasattr(self, f"_{name}"):
-                    # check, that the schema makes sense
-                    raise TypeError(
-                        f"{cls.__name__}.{name}: can't have both default value and transformation function at once."
-                        "Use _PREVIOUS_SCHEMA..."
-                    )
-
-        return used_keys
+        default = getattr(cls, name, None)
+        value = _validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
+        setattr(self, name, value)
 
     def _assign_field(self, name: str, python_type: Any, value: Any, object_path: str):
-        cls = self.__class__
-        use_default = hasattr(cls, name)
-        default = getattr(cls, name, ...)
-        value = _validated_object_type(python_type, value, default, use_default, object_path=f"{object_path}/{name}")
+        value = _validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
         setattr(self, name, value)
 
     def _assign_fields(self, source: Union[ParsedTree, "SchemaNode", NoneType], object_path: str) -> Set[str]:
@@ -221,45 +214,37 @@ class SchemaNode:
         annot = cls.__dict__.get("__annotations__", {})
 
         used_keys: Set[str] = set()
-        deffered: List[Tuple[str, Any]] = []
         for name, python_type in annot.items():
             if is_internal_field_name(name):
                 continue
 
             # populate field
-            if not source:
-                self._assign_field(name, python_type, None, object_path)
+            if source is None:
+                self._assign_default(name, python_type, object_path)
 
-            # we have a way how to create the value
+            # there is a transformation function to create the value
             elif hasattr(self, f"_{name}"):
-                deffered.append((name, python_type))
+                val = self._get_converted_value(name, source, object_path)
+                self._assign_field(name, python_type, val, object_path)
+                used_keys.add(name)
 
             # source just contains the value
             elif name in source:
                 val = source[name]
-                used_keys.add(name)
                 self._assign_field(name, python_type, val, object_path)
+                used_keys.add(name)
 
-            # there is a default value and in the source, the value is missing
-            elif getattr(self, name, ...) is not ...:
-                self._assign_field(name, python_type, None, object_path)
-
-            # the value is optional and there is nothing
-            elif is_optional(python_type):
-                self._assign_field(name, python_type, None, object_path)
+            # there is a default value, or the type is optional => store the default or null
+            elif hasattr(self, name) or is_optional(python_type):
+                self._assign_default(name, python_type, object_path)
 
             # we expected a value but it was not there
             else:
                 raise SchemaException(f"Missing attribute '{name}'.", object_path)
 
-        for name, python_type in deffered:
-            val = self._get_converted_value(name, source, object_path)
-            used_keys.add(name)  # the field might not exist, but that won't break anything
-            self._assign_field(name, python_type, val, object_path)
-
         return used_keys
 
-    def __init__(self, source: TSource = None, object_path: str = "/"):
+    def __init__(self, source: TSource = None, object_path: str = ""):
         # construct lower level schema node first if configured to do so
         if self._PREVIOUS_SCHEMA is not None:
             source = self._PREVIOUS_SCHEMA(source, object_path=object_path)  # pylint: disable=not-callable
@@ -269,8 +254,7 @@ class SchemaNode:
             source = ParsedTree(source)
 
         # assign fields
-        used_keys = self._assign_default_fields()
-        used_keys.update(self._assign_fields(source, object_path))
+        used_keys = self._assign_fields(source, object_path)
 
         # check for unused keys in the source object
         if source and not isinstance(source, SchemaNode):
@@ -287,7 +271,16 @@ class SchemaNode:
 
     def _get_converted_value(self, key: str, source: TSource, object_path: str) -> Any:
         try:
-            return getattr(self, f"_{key}")(source)
+            func = getattr(self.__class__, f"_{key}")
+            argc = len(inspect.signature(func).parameters)
+            if argc == 1:
+                # it is a static method
+                return func(source)
+            elif argc == 2:
+                # it is a instance method
+                return func(create_untouchable("self"), source)
+            else:
+                raise RuntimeError("Transformation function has wrong number of arguments")
         except (ValueError, DataException) as e:
             if len(e.args) > 0 and isinstance(e.args[0], str):
                 msg = e.args[0]