]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
json schema: added default values and doc strings
authorVasek Sraier <git@vakabus.cz>
Wed, 3 Nov 2021 12:41:02 +0000 (13:41 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:53 +0000 (16:17 +0200)
manager/knot_resolver_manager/datamodel/server_schema.py
manager/knot_resolver_manager/datamodel/types.py
manager/knot_resolver_manager/utils/modelling.py
manager/tests/datamodel/test_config_schema.py

index 9f386de41c51ab919c92ccad465726e68ad161d1..2e465ab5a79a614270d7ebe78ff4d309e9d199c2 100644 (file)
@@ -36,6 +36,10 @@ LogLevelEnum = LiteralEnum["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INF
 
 
 class ManagementSchema(SchemaNode):
+    """
+    Configuration of the Manager itself.
+    """
+
     # the default listen path here MUST use the default rundir
     listen: Listen = Listen({"unix-socket": "./manager.sock"})
     backend: BackendEnum = "auto"
index 96e7a362da7e0687a917d1291a4506c33d93207b..919562f4815d955df69bb44baf1997073206945f 100644 (file)
@@ -7,6 +7,7 @@ from typing import Any, Dict, Optional, Pattern, Type, Union
 
 from knot_resolver_manager.exceptions import SchemaException
 from knot_resolver_manager.utils import CustomValueType, SchemaNode
+from knot_resolver_manager.utils.modelling import Serializable
 
 logger = logging.getLogger(__name__)
 
@@ -139,7 +140,7 @@ class ListenType(Enum):
     INTERFACE_AND_PORT = auto()
 
 
-class Listen(SchemaNode):
+class Listen(SchemaNode, Serializable):
     class Raw(SchemaNode):
         ip: Optional[str] = None
         port: Optional[int] = None
@@ -213,6 +214,16 @@ class Listen(SchemaNode):
             and self.interface == o.interface
         )
 
+    def to_dict(self) -> Dict[Any, Any]:
+        if self.typ is ListenType.IP_AND_PORT:
+            return {"port": self.port, "ip": str(self.ip)}
+        elif self.typ is ListenType.UNIX_SOCKET:
+            return {"unix_socket": str(self.unix_socket)}
+        elif self.typ is ListenType.INTERFACE_AND_PORT:
+            return {"interface": self.interface, "port": self.port}
+        else:
+            raise NotImplementedError()
+
 
 class IPNetwork(CustomValueType):
     def __init__(self, source_value: Any, object_path: str = "/") -> None:
index 756a8c0df243f591b1184507cc54e4c48244cd0a..03aa947dd33ac7018217e367549237b25ea49895 100644 (file)
@@ -1,5 +1,6 @@
 import inspect
-from typing import Any, Dict, Optional, Set, Tuple, Type, Union
+from re import match
+from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union, cast
 
 from knot_resolver_manager.exceptions import DataException, SchemaException
 from knot_resolver_manager.utils.custom_types import CustomValueType
@@ -9,6 +10,7 @@ from knot_resolver_manager.utils.types import (
     NoneType,
     get_generic_type_argument,
     get_generic_type_arguments,
+    get_optional_inner_type,
     is_dict,
     is_enum,
     is_internal_field_name,
@@ -30,11 +32,59 @@ def is_obj_type(obj: Any, types: Union[type, Tuple[Any, ...], Tuple[type, ...]])
     return type(obj) == types
 
 
+class Serializable:
+    """
+    An interface for making classes serializable to a dictionary (and in turn into a JSON).
+    """
+
+    def to_dict(self) -> Dict[Any, Any]:
+        raise NotImplementedError(f"...for class {self.__class__.__name__}")
+
+    @staticmethod
+    def is_serializable(typ: Type[Any]) -> bool:
+        return (
+            typ in {str, bool, int, float}
+            or is_none_type(typ)
+            or is_literal(typ)
+            or is_dict(typ)
+            or is_list(typ)
+            or (inspect.isclass(typ) and issubclass(typ, Serializable))
+            or (inspect.isclass(typ) and issubclass(typ, CustomValueType))
+            or (inspect.isclass(typ) and issubclass(typ, SchemaNode))
+            or (is_optional(typ) and Serializable.is_serializable(get_optional_inner_type(typ)))
+            or (is_union(typ) and all_matches(lambda t: Serializable.is_serializable(t), get_generic_type_arguments(typ)))
+        )
+    
+    @staticmethod
+    def serialize(obj: Any, typ: Type[Any]) -> Any:
+        if inspect.isclass(typ) and issubclass(typ, Serializable):
+            return cast(Serializable, obj).to_dict()
+        
+        elif inspect.isclass(typ) and issubclass(typ, CustomValueType):
+            return cast(CustomValueType, obj).serialize()
+        
+        elif inspect.isclass(typ) and issubclass(typ, SchemaNode):
+            node = cast(SchemaNode, obj)
+            return node.serialize()
+        
+        elif is_list(typ):
+            lst = cast(List[Any], obj)
+            res: List[Any] = [Serializable.serialize(i, get_generic_type_argument(typ)) for i in lst]
+            return res
+
+        return obj
+
+
 def _get_properties_schema(typ: Type[Any]) -> Dict[Any, Any]:
     schema: Dict[Any, Any] = {}
     annot = typ.__dict__.get("__annotations__", {})
     for name, python_type in annot.items():
         schema[name] = _describe_type(python_type)
+        if hasattr(typ, name):
+            assert Serializable.is_serializable(
+                python_type
+            ), f"Type '{python_type}' does not appear to be JSON serializable"
+            schema[name]["default"] = Serializable.serialize(getattr(typ, name), python_type)
 
     return schema
 
@@ -62,7 +112,7 @@ def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
 
     elif is_literal(typ):
         val = get_generic_type_argument(typ)
-        return {"type": {str: "string", int: "integer", bool: "boolean"}[type(val)], "enum": [val]}
+        return {"type": {str: "string", int: "integer", bool: "boolean"}[type(val)], "const": val}
 
     elif is_union(typ):
         variants = get_generic_type_arguments(typ)
@@ -453,7 +503,18 @@ class SchemaNode:
         schema: Dict[Any, Any] = {}
         if include_schema_definition:
             schema["$schema"] = "https://json-schema.org/draft/2020-12/schema"
+        if cls.__doc__ is not None:
+            schema["description"] = cls.__doc__.strip()
         schema["type"] = "object"
         schema["properties"] = _get_properties_schema(cls)
 
         return schema
+    
+    def serialize(self) -> Dict[Any, Any]:
+        res: Dict[Any, Any] = {}
+        cls = self.__class__
+        annot = cls.__dict__.get("__annotations__", {})
+
+        for name, python_type in annot.items():
+            res[name] = Serializable.serialize(getattr(self, name), python_type)
+        return res
index 604f70ea133e76523630282c23935f685c273076..852e6eb04df07747b7771219bbbefabaecf7dfbd 100644 (file)
@@ -1,3 +1,5 @@
+import json
+from typing import Any, Dict, cast
 from knot_resolver_manager.datamodel import KresConfig
 from knot_resolver_manager.datamodel.types import IPv6Network96, TimeUnit
 
@@ -34,3 +36,23 @@ def test_dnssec_default_true():
     assert config.dnssec.trust_anchors == None
     assert config.dnssec.negative_trust_anchors == None
     assert config.dnssec.trust_anchors_files == None
+
+
+def test_json_schema():
+    dct = KresConfig.json_schema()
+
+    def recser(obj: Any, path: str = ''):
+        if not isinstance(obj, dict):
+            return
+        else:
+            obj = cast(Dict[Any, Any], obj)
+            for key in obj:
+                recser(obj[key], path=f"{path}/{key}")
+            try:
+                _ = json.dumps(obj)
+            except BaseException as e:
+                raise Exception(f"failed to serialize '{path}'") from e
+    
+    recser(dct)
+
+