import re
from enum import Enum, auto
from pathlib import Path
-from typing import Any, Dict, Optional, Pattern, Union
+from typing import Any, Dict, Optional, Pattern, Type, Union
from knot_resolver_manager.exceptions import SchemaException
from knot_resolver_manager.utils import CustomValueType, SchemaNode
class Unit(CustomValueType):
_re: Pattern[str]
- _units: Dict[Optional[str], int]
+ _units: Dict[str, int]
def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
def serialize(self) -> Any:
return self._value_orig
+ @classmethod
+ def json_schema(cls: Type["Unit"]) -> Dict[Any, Any]:
+ return {"type": "string", "pattern": r"\d+(" + "|".join(cls._units.keys()) + ")"}
+
class SizeUnit(Unit):
_re = re.compile(r"^([0-9]+)\s{0,1}([BKMG]){0,1}$")
def serialize(self) -> Any:
return str(self._value)
+ @classmethod
+ def json_schema(cls: Type["AnyPath"]) -> Dict[Any, Any]:
+ return {
+ "type": "string",
+ }
+
class ListenType(Enum):
IP_AND_PORT = auto()
def serialize(self) -> Any:
return self._value.with_prefixlen
+ @classmethod
+ def json_schema(cls: Type["IPNetwork"]) -> Dict[Any, Any]:
+ return {
+ "type": "string",
+ }
+
class IPv6Network96(CustomValueType):
def __init__(self, source_value: Any, object_path: str = "/") -> None:
def to_std(self) -> ipaddress.IPv6Network:
return self._value
+
+ @classmethod
+ def json_schema(cls: Type["IPv6Network96"]) -> Dict[Any, Any]:
+ return {"type": "string"}
# return success
return web.Response()
+ async def _handler_schema(self, _request: web.Request) -> web.Response:
+ return web.json_response(KresConfig.json_schema())
+
def _set_log_level(self, config: KresConfig):
if self.log_level != config.server.management.log_level:
# expects one existing log handler on the root
web.get("/", self._handler_index),
web.post(r"/config{path:.*}", self._handler_apply_config),
web.post("/stop", self._handler_stop),
+ web.get("/schema", self._handler_schema),
]
)
-from typing import Any, Callable, Iterable, Optional, Type, TypeVar
+from typing import Any, Callable, Optional, Type, TypeVar
from .custom_types import CustomValueType
from .modelling import SchemaNode
return ignore_exceptions_optional(type(default), default, *exceptions)
-def foldl(oper: Callable[[T, T], T], default: T, arr: Iterable[T]) -> T:
- val = default
- for x in arr:
- val = oper(val, x)
- return val
-
-
-def contains_element_matching(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
- return foldl(lambda x, y: x or y, False, map(cond, arr))
-
-
__all__ = [
"CustomValueType",
"SchemaNode",
-from typing import Any
+from typing import Any, Dict, Type
class CustomValueType:
to be the same semantically.
"""
raise NotImplementedError(f"{type(self).__name__}'s' 'to_dict()' not implemented.")
+
+ @classmethod
+ def json_schema(cls: Type["CustomValueType"]) -> Dict[Any, Any]:
+ raise NotImplementedError()
--- /dev/null
+from typing import Callable, Iterable, TypeVar
+
+T = TypeVar("T")
+
+
+def foldl(oper: Callable[[T, T], T], default: T, arr: Iterable[T]) -> T:
+ val = default
+ for x in arr:
+ val = oper(val, x)
+ return val
+
+
+def contains_element_matching(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
+ return foldl(lambda x, y: x or y, False, map(cond, arr))
+
+
+def all_matches(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
+ return foldl(lambda x, y: x and y, True, map(cond, arr))
from knot_resolver_manager.exceptions import DataException, SchemaException
from knot_resolver_manager.utils.custom_types import CustomValueType
+from knot_resolver_manager.utils.functional import all_matches
from knot_resolver_manager.utils.parsing import ParsedTree
from knot_resolver_manager.utils.types import (
NoneType,
return type(obj) == types
+def _get_properties_schema(typ: Type[Any]) -> Dict[Any, Any]:
+ schema: Dict[Any, Any] = {}
+ annot = typ.__dict__.get("__annotations__", {})
+ for name, python_type in annot.items():
+ schema[name] = _describe_type(python_type)
+
+ return schema
+
+
+def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
+ # pylint: disable=too-many-branches
+
+ if inspect.isclass(typ) and issubclass(typ, SchemaNode):
+ return typ.json_schema(include_schema_definition=False)
+
+ elif inspect.isclass(typ) and issubclass(typ, CustomValueType):
+ return typ.json_schema()
+
+ elif is_none_type(typ):
+ return {"type": "null"}
+
+ elif typ == int:
+ return {"type": "integer"}
+
+ elif typ == bool:
+ return {"type": "boolean"}
+
+ elif typ == str:
+ return {"type": "string"}
+
+ elif is_literal(typ):
+ val = get_generic_type_argument(typ)
+ return {"type": {str: "string", int: "integer", bool: "boolean"}[type(val)], "enum": [val]}
+
+ elif is_union(typ):
+ variants = get_generic_type_arguments(typ)
+ # simplification for Union of Literals
+ if all_matches(is_literal, variants):
+ return {"enum": [get_generic_type_argument(literal) for literal in variants]}
+ else:
+ return {"anyOf": [_describe_type(v) for v in variants]}
+
+ elif is_list(typ):
+ return {"type": "array", "items": _describe_type(get_generic_type_argument(typ))}
+
+ elif is_dict(typ):
+ key, val = get_generic_type_arguments(typ)
+ assert key == str, "We currently do not support any other keys then strings"
+ return {"type": "object", "additionalProperties": _describe_type(val)}
+
+ elif is_enum(typ):
+ return {"type": "string", "enum": [str(v) for v in typ]}
+
+ raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
+
+
def _validated_object_type(
cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False, object_path: str = "/"
) -> Any:
"""
Validation procedure called after all field are assigned. Should throw a ValueError in case of failure.
"""
+
+ @classmethod
+ def json_schema(cls: Type["SchemaNode"], include_schema_definition: bool = True) -> Dict[Any, Any]:
+ if cls._PREVIOUS_SCHEMA is not None:
+ return cls._PREVIOUS_SCHEMA.json_schema(include_schema_definition=include_schema_definition)
+
+ schema: Dict[Any, Any] = {}
+ if include_schema_definition:
+ schema["$schema"] = "https://json-schema.org/draft/2020-12/schema"
+ schema["type"] = "object"
+ schema["properties"] = _get_properties_schema(cls)
+
+ return schema