]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
utils: modelling: added basic support for generating JSON schema
authorVasek Sraier <git@vakabus.cz>
Mon, 18 Oct 2021 11:36:19 +0000 (13:36 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:53 +0000 (16:17 +0200)
manager/knot_resolver_manager/datamodel/types.py
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/utils/__init__.py
manager/knot_resolver_manager/utils/custom_types.py
manager/knot_resolver_manager/utils/functional.py [new file with mode: 0644]
manager/knot_resolver_manager/utils/modelling.py

index acf8ec7a2b4cbea2ac04c1f74cd9f2808d32ebce..96e7a362da7e0687a917d1291a4506c33d93207b 100644 (file)
@@ -3,7 +3,7 @@ import logging
 import re
 from enum import Enum, auto
 from pathlib import Path
-from typing import Any, Dict, Optional, Pattern, Union
+from typing import Any, Dict, Optional, Pattern, Type, Union
 
 from knot_resolver_manager.exceptions import SchemaException
 from knot_resolver_manager.utils import CustomValueType, SchemaNode
@@ -13,7 +13,7 @@ logger = logging.getLogger(__name__)
 
 class Unit(CustomValueType):
     _re: Pattern[str]
-    _units: Dict[Optional[str], int]
+    _units: Dict[str, int]
 
     def __init__(self, source_value: Any, object_path: str = "/") -> None:
         super().__init__(source_value)
@@ -72,6 +72,10 @@ class Unit(CustomValueType):
     def serialize(self) -> Any:
         return self._value_orig
 
+    @classmethod
+    def json_schema(cls: Type["Unit"]) -> Dict[Any, Any]:
+        return {"type": "string", "pattern": r"\d+(" + "|".join(cls._units.keys()) + ")"}
+
 
 class SizeUnit(Unit):
     _re = re.compile(r"^([0-9]+)\s{0,1}([BKMG]){0,1}$")
@@ -122,6 +126,12 @@ class AnyPath(CustomValueType):
     def serialize(self) -> Any:
         return str(self._value)
 
+    @classmethod
+    def json_schema(cls: Type["AnyPath"]) -> Dict[Any, Any]:
+        return {
+            "type": "string",
+        }
+
 
 class ListenType(Enum):
     IP_AND_PORT = auto()
@@ -231,6 +241,12 @@ class IPNetwork(CustomValueType):
     def serialize(self) -> Any:
         return self._value.with_prefixlen
 
+    @classmethod
+    def json_schema(cls: Type["IPNetwork"]) -> Dict[Any, Any]:
+        return {
+            "type": "string",
+        }
+
 
 class IPv6Network96(CustomValueType):
     def __init__(self, source_value: Any, object_path: str = "/") -> None:
@@ -276,3 +292,7 @@ class IPv6Network96(CustomValueType):
 
     def to_std(self) -> ipaddress.IPv6Network:
         return self._value
+
+    @classmethod
+    def json_schema(cls: Type["IPv6Network96"]) -> Dict[Any, Any]:
+        return {"type": "string"}
index 03ad96d5cfbd1c874fb9b4cdbc972356d9d1aafb..ae72098325abf5a70abb0daac92ecdbad9c128a6 100644 (file)
@@ -114,6 +114,9 @@ class Server:
         # return success
         return web.Response()
 
+    async def _handler_schema(self, _request: web.Request) -> web.Response:
+        return web.json_response(KresConfig.json_schema())
+
     def _set_log_level(self, config: KresConfig):
         if self.log_level != config.server.management.log_level:
             # expects one existing log handler on the root
@@ -139,6 +142,7 @@ class Server:
                 web.get("/", self._handler_index),
                 web.post(r"/config{path:.*}", self._handler_apply_config),
                 web.post("/stop", self._handler_stop),
+                web.get("/schema", self._handler_schema),
             ]
         )
 
index 4662a5d169e2902349d05b054224411630c5ab02..26273fd23813a854e9afe6b4bcf9a3a35a5ae8dc 100644 (file)
@@ -1,4 +1,4 @@
-from typing import Any, Callable, Iterable, Optional, Type, TypeVar
+from typing import Any, Callable, Optional, Type, TypeVar
 
 from .custom_types import CustomValueType
 from .modelling import SchemaNode
@@ -41,17 +41,6 @@ def ignore_exceptions(
     return ignore_exceptions_optional(type(default), default, *exceptions)
 
 
-def foldl(oper: Callable[[T, T], T], default: T, arr: Iterable[T]) -> T:
-    val = default
-    for x in arr:
-        val = oper(val, x)
-    return val
-
-
-def contains_element_matching(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
-    return foldl(lambda x, y: x or y, False, map(cond, arr))
-
-
 __all__ = [
     "CustomValueType",
     "SchemaNode",
index c0328656925dd37d8d2b532a758218c8f2421a05..ea17c49e708929ea36600f5fe275283db96b51ef 100644 (file)
@@ -1,4 +1,4 @@
-from typing import Any
+from typing import Any, Dict, Type
 
 
 class CustomValueType:
@@ -38,3 +38,7 @@ class CustomValueType:
         to be the same semantically.
         """
         raise NotImplementedError(f"{type(self).__name__}'s' 'to_dict()' not implemented.")
+
+    @classmethod
+    def json_schema(cls: Type["CustomValueType"]) -> Dict[Any, Any]:
+        raise NotImplementedError()
diff --git a/manager/knot_resolver_manager/utils/functional.py b/manager/knot_resolver_manager/utils/functional.py
new file mode 100644 (file)
index 0000000..c8100ee
--- /dev/null
@@ -0,0 +1,18 @@
+from typing import Callable, Iterable, TypeVar
+
+T = TypeVar("T")
+
+
+def foldl(oper: Callable[[T, T], T], default: T, arr: Iterable[T]) -> T:
+    val = default
+    for x in arr:
+        val = oper(val, x)
+    return val
+
+
+def contains_element_matching(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
+    return foldl(lambda x, y: x or y, False, map(cond, arr))
+
+
+def all_matches(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
+    return foldl(lambda x, y: x and y, True, map(cond, arr))
index c4f0b91371f2b613f241c15ef1e32f3871c283b2..756a8c0df243f591b1184507cc54e4c48244cd0a 100644 (file)
@@ -3,6 +3,7 @@ from typing import Any, Dict, Optional, Set, Tuple, Type, Union
 
 from knot_resolver_manager.exceptions import DataException, SchemaException
 from knot_resolver_manager.utils.custom_types import CustomValueType
+from knot_resolver_manager.utils.functional import all_matches
 from knot_resolver_manager.utils.parsing import ParsedTree
 from knot_resolver_manager.utils.types import (
     NoneType,
@@ -29,6 +30,62 @@ def is_obj_type(obj: Any, types: Union[type, Tuple[Any, ...], Tuple[type, ...]])
     return type(obj) == types
 
 
+def _get_properties_schema(typ: Type[Any]) -> Dict[Any, Any]:
+    schema: Dict[Any, Any] = {}
+    annot = typ.__dict__.get("__annotations__", {})
+    for name, python_type in annot.items():
+        schema[name] = _describe_type(python_type)
+
+    return schema
+
+
+def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
+    # pylint: disable=too-many-branches
+
+    if inspect.isclass(typ) and issubclass(typ, SchemaNode):
+        return typ.json_schema(include_schema_definition=False)
+
+    elif inspect.isclass(typ) and issubclass(typ, CustomValueType):
+        return typ.json_schema()
+
+    elif is_none_type(typ):
+        return {"type": "null"}
+
+    elif typ == int:
+        return {"type": "integer"}
+
+    elif typ == bool:
+        return {"type": "boolean"}
+
+    elif typ == str:
+        return {"type": "string"}
+
+    elif is_literal(typ):
+        val = get_generic_type_argument(typ)
+        return {"type": {str: "string", int: "integer", bool: "boolean"}[type(val)], "enum": [val]}
+
+    elif is_union(typ):
+        variants = get_generic_type_arguments(typ)
+        # simplification for Union of Literals
+        if all_matches(is_literal, variants):
+            return {"enum": [get_generic_type_argument(literal) for literal in variants]}
+        else:
+            return {"anyOf": [_describe_type(v) for v in variants]}
+
+    elif is_list(typ):
+        return {"type": "array", "items": _describe_type(get_generic_type_argument(typ))}
+
+    elif is_dict(typ):
+        key, val = get_generic_type_arguments(typ)
+        assert key == str, "We currently do not support any other keys then strings"
+        return {"type": "object", "additionalProperties": _describe_type(val)}
+
+    elif is_enum(typ):
+        return {"type": "string", "enum": [str(v) for v in typ]}
+
+    raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
+
+
 def _validated_object_type(
     cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False, object_path: str = "/"
 ) -> Any:
@@ -387,3 +444,16 @@ class SchemaNode:
         """
         Validation procedure called after all field are assigned. Should throw a ValueError in case of failure.
         """
+
+    @classmethod
+    def json_schema(cls: Type["SchemaNode"], include_schema_definition: bool = True) -> Dict[Any, Any]:
+        if cls._PREVIOUS_SCHEMA is not None:
+            return cls._PREVIOUS_SCHEMA.json_schema(include_schema_definition=include_schema_definition)
+
+        schema: Dict[Any, Any] = {}
+        if include_schema_definition:
+            schema["$schema"] = "https://json-schema.org/draft/2020-12/schema"
+        schema["type"] = "object"
+        schema["properties"] = _get_properties_schema(cls)
+
+        return schema