- ``GET /metrics`` provides Prometheus metrics
- ``GET /`` static response that could be used to determine, whether the Manager is running
- ``POST /stop`` gracefully stops the Manager, empty request body
-- ``{GET,POST,PUT,DELETE,PATCH} /v1/config`` allows reading and modifying current configuration
+- ``{GET,PUT,DELETE,PATCH} /v1/config`` allows reading and modifying current configuration
Config modification endpoint (v1)
Note: The ``v1`` version qualifier is there for future-proofing. We don't have any plans at the moment to change the API any time soon. If that happens, we will support both old and new API versions for the some transition period.
-The API by default expects JSON, but can also parse YAML when the ``Content-Type`` header is set to ``text/vnd.yaml``. The return value is always a JSON with ``Content-Type: application/json``. The schema of input and output is always a subtree of the configuration data model which is described by the JSON schema exposed at ``/schema``.
+The API by default expects JSON, but can also parse YAML when the ``Content-Type`` header is set to ``application/yaml`` or ``text/vnd.yaml``. The return value is always a JSON with ``Content-Type: application/json``. The schema of input and output is always a subtree of the configuration data model which is described by the JSON schema exposed at ``/schema``.
The API can operate on any configuration subtree by specifying a `JSON pointer <https://www.rfc-editor.org/rfc/rfc6901>`_ in the URL path (property names and list indices joined with ``/``). For example, to get the number of worker processes, you can send ``GET`` request to ``v1/config/workers``.
The different HTTP methods perform different modifications of the configuration:
- ``GET`` return subtree of the current configuration
-- ``POST`` set property value or append to a list
-- ``DELETE`` removes the given property or list index
-- ``PATCH`` set property or list item value, fails when not present
-- ``PUT`` set property, never replaces an existing value
-
-TODO Make sure we follow the JSON pointer RFC (we do not do that exactly at this point)
-TODO consider using JSON Patch format instead
-TODO this is not tested properly
-TODO consider using JSON Patch the ``PATCH`` method
-TODO there is probably a bug in query.py (see FIXME comment there), verify that
+- ``PUT`` set property
+- ``DELETE`` removes the given property or list item at the given index
+- ``PATCH`` updates the configuration using `JSON Patch <https://jsonpatch.com/>_`
To prevent race conditions when changing configuration from multiple clients simultaneously, every response from the Manager has an ``ETag`` header set. Requests then accept ``If-Match`` and ``If-None-Match`` headers with the latest ``ETag`` value and the corresponding request processing fails with HTTP error code 412 (precondition failed).
if not self.path.startswith("/"):
self.path = "/" + self.path
- method: Literal["GET", "POST"] = "GET" if self.replacement_value is None else "POST"
+ method: Literal["GET", "PUT"] = "GET" if self.replacement_value is None else "PUT"
url = f"{args.socket}/v1/config{self.path}"
response = request(method, url, self.replacement_value)
print(response)
from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
from knot_resolver_manager.utils.modeling import BaseSchema
-from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_Valid
+from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_valid
try:
# On Debian 10, the typing_extensions library does not contain TypeAlias.
def _target(self, raw: Raw) -> LogTargetEnum:
if raw.target == "from-env":
target = os.environ.get("KRES_LOGGING_TARGET") or "stdout"
- if not is_obj_type_Valid(target, cast(Type[Any], LogTargetEnum)):
+ if not is_obj_type_valid(target, cast(Type[Any], LogTargetEnum)):
raise ValueError(f"logging target '{target}' read from $KRES_LOGGING_TARGET is invalid")
return cast(LogTargetEnum, target)
else:
from http import HTTPStatus
from pathlib import Path
from time import time
-from typing import Any, List, Optional, Set, Union, cast
+from typing import Any, Dict, List, Optional, Set, Union, cast
from aiohttp import web
from aiohttp.web import middleware
from knot_resolver_manager.kresd_controller import get_best_controller_implementation
from knot_resolver_manager.utils import ignore_exceptions_optional
from knot_resolver_manager.utils.async_utils import readfile
+from knot_resolver_manager.utils.etag import structural_etag
from knot_resolver_manager.utils.functional import Result
-from knot_resolver_manager.utils.modeling import ParsedTree, parse, parse_yaml
from knot_resolver_manager.utils.modeling.exceptions import DataParsingError, DataValidationError
+from knot_resolver_manager.utils.modeling.parsing import parse, parse_yaml
+from knot_resolver_manager.utils.modeling.query import query
from knot_resolver_manager.utils.modeling.types import NoneType
from knot_resolver_manager.utils.systemd_notify import systemd_notify
# parse the incoming data
if request.method == "GET":
- update_with: Optional[ParsedTree] = None
+ update_with: Optional[Dict[str, Any]] = None
else:
update_with = parse(await request.text(), request.content_type)
document_path = request.match_info["path"]
getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
etags = getheaders("if-match")
not_etags = getheaders("if-none-match")
- current_config: ParsedTree = self.config_store.get().get_unparsed_data()
+ current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
# stop processing if etags
def strip_quotes(s: str) -> str:
# WARNING: this check is prone to race conditions. When changing, make sure that the current config
# is really the latest current config (i.e. no await in between obtaining the config and the checks)
status = HTTPStatus.NOT_MODIFIED if request.method in ("GET", "HEAD") else HTTPStatus.PRECONDITION_FAILED
- if etags is not None and current_config.etag not in map(strip_quotes, etags):
+ if etags is not None and structural_etag(current_config) not in map(strip_quotes, etags):
return web.Response(status=status)
- if not_etags is not None and current_config.etag in map(strip_quotes, not_etags):
+ if not_etags is not None and structural_etag(current_config) in map(strip_quotes, not_etags):
return web.Response(status=status)
# run query
- op = cast(Literal["get", "post", "delete", "patch", "put"], request.method.lower())
- new_config, to_return = current_config.query(op, document_path, update_with)
+ op = cast(Literal["get", "delete", "patch", "put"], request.method.lower())
+ new_config, to_return = query(current_config, op, document_path, update_with)
# update the config
if request.method != "GET":
# return success
resp_text: Optional[str] = str(to_return) if to_return is not None else None
res = web.Response(status=HTTPStatus.OK, text=resp_text, content_type="application/json")
- res.headers.add("ETag", f'"{new_config.etag}"')
+ res.headers.add("ETag", f'"{structural_etag(new_config)}"')
return res
async def _handler_metrics(self, _request: web.Request) -> web.Response:
self.app.add_routes(
[
web.get("/", self._handler_index),
- web.post(r"/v1/config{path:.*}", self._handler_config_query),
- web.put(r"/v1/config{path:.*}", self._handler_config_query),
- web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.get(r"/v1/config{path:.*}", self._handler_config_query),
+ web.put(r"/v1/config{path:.*}", self._handler_config_query),
web.delete(r"/v1/config{path:.*}", self._handler_config_query),
+ web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.post("/stop", self._handler_stop),
web.get("/schema", self._handler_schema),
web.get("/schema/ui", self._handle_view_schema),
return self._exit_code
-async def _load_raw_config(config: Union[Path, ParsedTree]) -> ParsedTree:
+async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any]:
# Initial configuration of the manager
if isinstance(config, Path):
if not config.exists():
config = parse_yaml(await readfile(config))
# validate the initial configuration
- assert isinstance(config, ParsedTree)
+ assert isinstance(config, dict)
return config
-async def _load_config(config: ParsedTree) -> KresConfig:
+async def _load_config(config: Dict[str, Any]) -> KresConfig:
logger.info("Validating initial configuration...")
config_validated = KresConfig(config)
return config_validated
-async def _init_config_store(config: ParsedTree) -> ConfigStore:
+async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
config_validated = await _load_config(config)
config_store = ConfigStore(config_validated)
return config_store
return Result.ok(None)
-def _set_working_directory(config_raw: ParsedTree) -> None:
+def _set_working_directory(config_raw: Dict[str, Any]) -> None:
config = KresConfig(config_raw)
if not config.rundir.to_path().exists():
sys.exit(128 + signal.SIGTERM)
-async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_FILE) -> int:
+async def start_server(config: Union[Path, Dict[str, Any]] = DEFAULT_MANAGER_CONFIG_FILE) -> int:
# This function is quite long, but it describes how manager runs. So let's silence pylint
# pylint: disable=too-many-statements
--- /dev/null
+import base64
+import json
+from hashlib import blake2b
+from typing import Any
+
+
+def structural_etag(obj: Any) -> str:
+ m = blake2b(digest_size=15)
+ m.update(json.dumps(obj, sort_keys=True).encode("utf8"))
+ return base64.urlsafe_b64encode(m.digest()).decode("utf8")
```
To parse data from YAML format just use `parse_yaml` function or `parse_json` for JSON format.
-Parsed data are represented as `ParsedTree` which is a simple wrapper for dict-like object that takes care of `-`/`_` conversion.
+Parsed data are stored in a dict-like object that takes care of `-`/`_` conversion.
```python
from .modeling import parse_yaml
from .base_schema import BaseSchema
from .base_value_type import BaseValueType
-from .parsing import ParsedTree, parse, parse_json, parse_yaml
-from .query import QueryTree
+from .parsing import parse, parse_json, parse_yaml
__all__ = [
"BaseValueType",
"BaseSchema",
- "ParsedTree",
"parse",
"parse_yaml",
"parse_json",
- "QueryTree",
]
import enum
import inspect
-from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union, cast
+from typing import Any, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union, cast
import yaml
from .base_value_type import BaseValueType
from .exceptions import AggregateDataValidationError, DataDescriptionError, DataValidationError
-from .parsing import ParsedTree
+from .renaming import Renamed, renamed
from .types import (
NoneType,
get_generic_type_argument,
raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
-def _validated_tuple(cls: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
- types = get_generic_type_arguments(cls)
- errs: List[DataValidationError] = []
- res: List[Any] = []
- for i, (tp, val) in enumerate(zip(types, obj)):
- try:
- res.append(_validated_object_type(tp, val, object_path=f"{object_path}[{i}]"))
- except DataValidationError as e:
- errs.append(e)
- if len(errs) == 1:
- raise errs[0]
- elif len(errs) > 1:
- raise AggregateDataValidationError(object_path, child_exceptions=errs)
- return tuple(res)
-
-
-def _validated_dict(cls: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
- key_type, val_type = get_generic_type_arguments(cls)
- try:
- errs: List[DataValidationError] = []
- res: Dict[Any, Any] = {}
- for key, val in obj.items():
- try:
- nkey = _validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
- nval = _validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
- res[nkey] = nval
- except DataValidationError as e:
- errs.append(e)
- if len(errs) == 1:
- raise errs[0]
- elif len(errs) > 1:
- raise AggregateDataValidationError(object_path, child_exceptions=errs)
- return res
- except AttributeError as e:
- raise DataValidationError(
- f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
- ) from e
-
-
-def _validated_list(cls: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
- inner_type = get_generic_type_argument(cls)
- errs: List[DataValidationError] = []
- res: List[Any] = []
- for i, val in enumerate(obj):
- try:
- res.append(_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
- except DataValidationError as e:
- errs.append(e)
- if len(errs) == 1:
- raise errs[0]
- elif len(errs) > 1:
- raise AggregateDataValidationError(object_path, child_exceptions=errs)
- return res
-
-
-def _validated_object_type(
- cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False, object_path: str = "/"
-) -> Any:
- """
- Given an expected type `cls` and a value object `obj`, validate the type of `obj` and return it
- """
-
- # Disabling these checks, because I think it's much more readable as a single function
- # and it's not that large at this point. If it got larger, then we should definitely split it
- # pylint: disable=too-many-branches,too-many-locals,too-many-statements
-
- # default values
- if obj is None and use_default:
- return default
-
- # NoneType
- elif is_none_type(cls):
- if obj is None:
- return None
- else:
- raise DataValidationError(f"expected None, found '{obj}'.", object_path)
-
- # Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
- elif is_optional(cls):
- inner: Type[Any] = get_optional_inner_type(cls)
- if obj is None:
- return None
- else:
- return _validated_object_type(inner, obj, object_path=object_path)
-
- # Union[*variants]
- elif is_union(cls):
- variants = get_generic_type_arguments(cls)
- errs: List[DataValidationError] = []
- for v in variants:
- try:
- return _validated_object_type(v, obj, object_path=object_path)
- except DataValidationError as e:
- errs.append(e)
-
- raise DataValidationError("could not parse any of the possible variants", object_path, child_exceptions=errs)
-
- # after this, there is no place for a None object
- elif obj is None:
- raise DataValidationError(f"unexpected value 'None' for type {cls}", object_path)
-
- # int
- elif cls == int:
- # we don't want to make an int out of anything else than other int
- # except for BaseValueType class instances
- if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
- return int(obj)
- raise DataValidationError(f"expected int, found {type(obj)}", object_path)
-
- # str
- elif cls == str:
- # we are willing to cast any primitive value to string, but no compound values are allowed
- if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
- return str(obj)
- elif is_obj_type(obj, bool):
- raise DataValidationError(
- "Expected str, found bool. Be careful, that YAML parsers consider even"
- ' "no" and "yes" as a bool. Search for the Norway Problem for more'
- " details. And please use quotes explicitly.",
- object_path,
- )
- else:
- raise DataValidationError(
- f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
- )
-
- # bool
- elif cls == bool:
- if is_obj_type(obj, bool):
- return obj
- else:
- raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
-
- # float
- elif cls == float:
- raise NotImplementedError(
- "Floating point values are not supported in the parser."
- " Please implement them and be careful with type coercions"
- )
-
- # Literal[T]
- elif is_literal(cls):
- expected = get_generic_type_arguments(cls)
- if obj in expected:
- return obj
- else:
- raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
-
- # Dict[K,V]
- elif is_dict(cls):
- return _validated_dict(cls, obj, object_path)
-
- # any Enums (probably used only internally in DataValidator)
- elif is_enum(cls):
- if isinstance(obj, cls):
- return obj
- else:
- raise DataValidationError(f"unexpected value '{obj}' for enum '{cls}'", object_path)
-
- # List[T]
- elif is_list(cls):
- if isinstance(obj, str):
- raise DataValidationError("expected list, got string", object_path)
- return _validated_list(cls, obj, object_path)
-
- # Tuple[A,B,C,D,...]
- elif is_tuple(cls):
- return _validated_tuple(cls, obj, object_path)
-
- # type of obj and cls type match
- elif is_obj_type(obj, cls):
- return obj
-
- # BaseValueType subclasses
- elif inspect.isclass(cls) and issubclass(cls, BaseValueType):
- if isinstance(obj, cls):
- # if we already have a custom value type, just pass it through
- return obj
- else:
- # no validation performed, the implementation does it in the constuctor
- try:
- return cls(obj, object_path=object_path)
- except ValueError as e:
- if len(e.args) > 0 and isinstance(e.args[0], str):
- msg = e.args[0]
- else:
- msg = f"Failed to validate value against {cls} type"
- raise DataValidationError(msg, object_path) from e
-
- # nested BaseSchema subclasses
- elif inspect.isclass(cls) and issubclass(cls, BaseSchema):
- # we should return DataParser, we expect to be given a dict,
- # because we can construct a DataParser from it
- if isinstance(obj, (dict, BaseSchema)):
- return cls(obj, object_path=object_path) # type: ignore
- raise DataValidationError(f"expected 'dict' or 'BaseSchema' object, found '{type(obj)}'", object_path)
-
- # if the object matches, just pass it through
- elif inspect.isclass(cls) and isinstance(obj, cls):
- return obj
-
- # default error handler
- else:
- raise DataValidationError(
- f"Type {cls} cannot be parsed. This is a implementation error. "
- "Please fix your types in the class or improve the parser/validator.",
- object_path,
- )
-
-
-TSource = Union[NoneType, ParsedTree, "BaseSchema", Dict[str, Any]]
+TSource = Union[NoneType, "NoRenameBaseSchema", Dict[str, Any]]
def _create_untouchable(name: str) -> object:
return _Untouchable()
-class BaseSchema(Serializable):
+class NoRenameBaseSchema(Serializable):
"""
Base class for modeling configuration schema. It somewhat resembles standard dataclasses with additional
functionality:
* data conversion
To create an instance of this class, you have to provide source data in the form of dict-like object.
- Generally, we expect `ParsedTree`, raw dict or another `BaseSchema` instance. The provided data object
- is traversed, transformed and validated before assigned to the appropriate fields (attributes).
+ Generally, raw dict or another `BaseSchema` instance. The provided data object is traversed, transformed
+ and validated before assigned to the appropriate fields (attributes).
Fields (attributes)
===================
See tests/utils/test_modelling.py for example usage.
"""
- _LAYER: Optional[Type["BaseSchema"]] = None
+ _LAYER: Optional[Type["NoRenameBaseSchema"]] = None
def _assign_default(self, name: str, python_type: Any, object_path: str) -> None:
cls = self.__class__
default = getattr(cls, name, None)
- value = _validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
+ value = type(self).validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
setattr(self, name, value)
def _assign_field(self, name: str, python_type: Any, value: Any, object_path: str) -> None:
- value = _validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
+ value = type(self).validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
setattr(self, name, value)
- def _assign_fields(self, source: Union[ParsedTree, "BaseSchema", NoneType], object_path: str) -> Set[str]:
+ def _assign_fields(self, source: Union[Dict[str, Any], "NoRenameBaseSchema", None], object_path: str) -> Set[str]:
"""
Order of assignment:
1. all direct assignments
def __init__(self, source: TSource = None, object_path: str = ""):
# make sure that all raw data checks passed on the source object
if source is None:
- source = ParsedTree({})
- if isinstance(source, dict):
- source = ParsedTree(source)
+ source = {}
- # save source
- self._source: Union[ParsedTree, BaseSchema] = source
+ if not isinstance(source, (NoRenameBaseSchema, dict)):
+ raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
+
+ # save source (3 underscores to prevent collisions with any user defined conversion methods or system methods)
+ self.___source: Union[Dict[str, Any], NoRenameBaseSchema] = source
# construct lower level schema first if configured to do so
if self._LAYER is not None:
source = self._LAYER(source, object_path=object_path) # pylint: disable=not-callable
- # prevent failure when user provides a different type than object
- if isinstance(source, ParsedTree) and not source.is_dict():
- raise DataValidationError(f"expected object, found '{source.type()}'", object_path)
-
# assign fields
used_keys = self._assign_fields(source, object_path)
# check for unused keys in the source object
- if source and not isinstance(source, BaseSchema):
+ if source and not isinstance(source, NoRenameBaseSchema):
unused = source.keys() - used_keys
if len(unused) > 0:
keys = ", ".join((f"'{u}'" for u in unused))
except ValueError as e:
raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
- def get_unparsed_data(self) -> ParsedTree:
- if isinstance(self._source, BaseSchema):
- return self._source.get_unparsed_data()
+ def get_unparsed_data(self) -> Dict[str, Any]:
+ if isinstance(self.___source, NoRenameBaseSchema):
+ return self.___source.get_unparsed_data()
+ elif isinstance(self.___source, Renamed):
+ return self.___source.original()
else:
- return self._source
+ return self.___source
def _get_converted_value(self, key: str, source: TSource, object_path: str) -> Any:
"""
return True
@classmethod
- def json_schema(cls: Type["BaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
+ def json_schema(cls: Type["NoRenameBaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
if cls._LAYER is not None:
return cls._LAYER.json_schema(include_schema_definition=include_schema_definition)
res[name] = Serializable.serialize(getattr(self, name))
return res
+ @classmethod
+ def _validated_tuple(
+ cls: Type["NoRenameBaseSchema"], tp: Type[Any], obj: Tuple[Any, ...], object_path: str
+ ) -> Tuple[Any, ...]:
+ types = get_generic_type_arguments(tp)
+ errs: List[DataValidationError] = []
+ res: List[Any] = []
+ for i, (t, val) in enumerate(zip(types, obj)):
+ try:
+ res.append(cls.validated_object_type(t, val, object_path=f"{object_path}[{i}]"))
+ except DataValidationError as e:
+ errs.append(e)
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ return tuple(res)
-def is_obj_type_Valid(obj: Any, tp: Type[Any]) -> bool:
+ @classmethod
+ def _validated_dict(
+ cls: Type["NoRenameBaseSchema"], tp: Type[Any], obj: Dict[Any, Any], object_path: str
+ ) -> Dict[Any, Any]:
+ key_type, val_type = get_generic_type_arguments(tp)
+ try:
+ errs: List[DataValidationError] = []
+ res: Dict[Any, Any] = {}
+ for key, val in obj.items():
+ try:
+ nkey = cls.validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
+ nval = cls.validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
+ res[nkey] = nval
+ except DataValidationError as e:
+ errs.append(e)
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ return res
+ except AttributeError as e:
+ raise DataValidationError(
+ f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
+ ) from e
+
+ @classmethod
+ def _validated_list(cls: Type["NoRenameBaseSchema"], tp: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
+ inner_type = get_generic_type_argument(tp)
+ errs: List[DataValidationError] = []
+ res: List[Any] = []
+ for i, val in enumerate(obj):
+ try:
+ res.append(cls.validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
+ except DataValidationError as e:
+ errs.append(e)
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ return res
+
+ @classmethod
+ def validated_object_type(
+ cls: Type["NoRenameBaseSchema"],
+ tp: Type[Any],
+ obj: Any,
+ default: Any = ...,
+ use_default: bool = False,
+ object_path: str = "/",
+ ) -> Any:
+ """
+ Given an expected type `cls` and a value object `obj`, validate the type of `obj` and return it
+ """
+
+ # Disabling these checks, because I think it's much more readable as a single function
+ # and it's not that large at this point. If it got larger, then we should definitely split it
+ # pylint: disable=too-many-branches,too-many-locals,too-many-statements
+
+ # default values
+ if obj is None and use_default:
+ return default
+
+ # NoneType
+ elif is_none_type(tp):
+ if obj is None:
+ return None
+ else:
+ raise DataValidationError(f"expected None, found '{obj}'.", object_path)
+
+ # Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
+ elif is_optional(tp):
+ inner: Type[Any] = get_optional_inner_type(tp)
+ if obj is None:
+ return None
+ else:
+ return cls.validated_object_type(inner, obj, object_path=object_path)
+
+ # Union[*variants]
+ elif is_union(tp):
+ variants = get_generic_type_arguments(tp)
+ errs: List[DataValidationError] = []
+ for v in variants:
+ try:
+ return cls.validated_object_type(v, obj, object_path=object_path)
+ except DataValidationError as e:
+ errs.append(e)
+
+ raise DataValidationError(
+ "could not parse any of the possible variants", object_path, child_exceptions=errs
+ )
+
+ # after this, there is no place for a None object
+ elif obj is None:
+ raise DataValidationError(f"unexpected value 'None' for type {tp}", object_path)
+
+ # int
+ elif tp == int:
+ # we don't want to make an int out of anything else than other int
+ # except for BaseValueType class instances
+ if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
+ return int(obj)
+ raise DataValidationError(f"expected int, found {type(obj)}", object_path)
+
+ # str
+ elif tp == str:
+ # we are willing to cast any primitive value to string, but no compound values are allowed
+ if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
+ return str(obj)
+ elif is_obj_type(obj, bool):
+ raise DataValidationError(
+ "Expected str, found bool. Be careful, that YAML parsers consider even"
+ ' "no" and "yes" as a bool. Search for the Norway Problem for more'
+ " details. And please use quotes explicitly.",
+ object_path,
+ )
+ else:
+ raise DataValidationError(
+ f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
+ )
+
+ # bool
+ elif tp == bool:
+ if is_obj_type(obj, bool):
+ return obj
+ else:
+ raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+
+ # float
+ elif tp == float:
+ raise NotImplementedError(
+ "Floating point values are not supported in the parser."
+ " Please implement them and be careful with type coercions"
+ )
+
+ # Literal[T]
+ elif is_literal(tp):
+ expected = get_generic_type_arguments(tp)
+ if obj in expected:
+ return obj
+ else:
+ raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+
+ # Dict[K,V]
+ elif is_dict(tp):
+ return cls._validated_dict(tp, obj, object_path)
+
+ # any Enums (probably used only internally in DataValidator)
+ elif is_enum(tp):
+ if isinstance(obj, tp):
+ return obj
+ else:
+ raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
+
+ # List[T]
+ elif is_list(tp):
+ if isinstance(obj, str):
+ raise DataValidationError("expected list, got string", object_path)
+ return cls._validated_list(tp, obj, object_path)
+
+ # Tuple[A,B,C,D,...]
+ elif is_tuple(tp):
+ return cls._validated_tuple(tp, obj, object_path)
+
+ # type of obj and cls type match
+ elif is_obj_type(obj, tp):
+ return obj
+
+ # when the specified type is Any, just return the given value
+ # (pylint does something weird on the following line and it happens only on python 3.10)
+ elif tp == Any: # pylint: disable=comparison-with-callable
+ return obj
+
+ # BaseValueType subclasses
+ elif inspect.isclass(tp) and issubclass(tp, BaseValueType):
+ if isinstance(obj, tp):
+ # if we already have a custom value type, just pass it through
+ return obj
+ else:
+ # no validation performed, the implementation does it in the constuctor
+ try:
+ return tp(obj, object_path=object_path)
+ except ValueError as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = f"Failed to validate value against {tp} type"
+ raise DataValidationError(msg, object_path) from e
+
+ # nested BaseSchema subclasses
+ elif inspect.isclass(tp) and issubclass(tp, NoRenameBaseSchema):
+ # we should return DataParser, we expect to be given a dict,
+ # because we can construct a DataParser from it
+ if isinstance(obj, (dict, NoRenameBaseSchema)):
+ return tp(obj, object_path=object_path) # type: ignore
+ raise DataValidationError(
+ f"expected 'dict' or 'NoRenameBaseSchema' object, found '{type(obj)}'", object_path
+ )
+
+ # if the object matches, just pass it through
+ elif inspect.isclass(tp) and isinstance(obj, tp):
+ return obj
+
+ # default error handler
+ else:
+ raise DataValidationError(
+ f"Type {tp} cannot be parsed. This is a implementation error. "
+ "Please fix your types in the class or improve the parser/validator.",
+ object_path,
+ )
+
+
+def is_obj_type_valid(obj: Any, tp: Type[Any]) -> bool:
"""
Runtime type checking. Validate, that a given object is of a given type.
"""
try:
- _validated_object_type(tp, obj)
+ NoRenameBaseSchema.validated_object_type(tp, obj)
return True
except (DataValidationError, ValueError):
return False
+
+
+T = TypeVar("T")
+
+
+def load(cls: Type[T], obj: Any, default: Any = ..., use_default: bool = False) -> T:
+ return NoRenameBaseSchema.validated_object_type(cls, obj, default, use_default)
+
+
+class BaseSchema(NoRenameBaseSchema):
+ """
+ In Knot Resolver Manager, we need renamed keys most of the time, as we are using the modelling
+ tools mostly for configuration schema. That's why the normal looking name BaseSchema does renaming
+ and NoRenameBaseSchema is the opposite.
+ """
+
+ def __init__(self, source: TSource = None, object_path: str = ""):
+ if isinstance(source, dict):
+ source = renamed(source)
+ super().__init__(source, object_path)
+
+ @classmethod
+ def _validated_dict(
+ cls: Type["BaseSchema"], tp: Type[Any], obj: Dict[Any, Any], object_path: str
+ ) -> Dict[Any, Any]:
+ if isinstance(obj, Renamed):
+ obj = obj.original()
+ return super()._validated_dict(tp, obj, object_path)
--- /dev/null
+"""
+Implements JSON pointer resolution based on RFC 6901:
+https://www.rfc-editor.org/rfc/rfc6901
+"""
+
+
+from typing import Any, Optional, Tuple, Union
+
+# JSONPtrAddressable = Optional[Union[Dict[str, "JSONPtrAddressable"], List["JSONPtrAddressable"], int, float, bool, str, None]]
+JSONPtrAddressable = Any # the recursive definition above is not valid :(
+
+
+class _JSONPtr:
+ @staticmethod
+ def _decode_token(token: str) -> str:
+ """
+ Resolves escaped characters ~ and /
+ """
+
+ # the order of the replace statements is important, do not change without
+ # consulting the RFC
+ return token.replace("~1", "/").replace("~0", "~")
+
+ @staticmethod
+ def _encode_token(token: str) -> str:
+ return token.replace("~", "~0").replace("/", "~1")
+
+ def __init__(self, ptr: str):
+ if ptr == "":
+ # pointer to the root
+ self.tokens = []
+
+ else:
+ if ptr[0] != "/":
+ raise SyntaxError(
+ f"JSON pointer '{ptr}' invalid: the first character MUST be '/' or the pointer must be empty"
+ )
+
+ ptr = ptr[1:]
+ self.tokens = [_JSONPtr._decode_token(tok) for tok in ptr.split("/")]
+
+ def resolve(
+ self, obj: JSONPtrAddressable
+ ) -> Tuple[Optional[JSONPtrAddressable], JSONPtrAddressable, Union[str, int, None]]:
+ """
+ Returns (Optional[parent], Optional[direct value], key of value in the parent object)
+ """
+
+ parent: Optional[JSONPtrAddressable] = None
+ current = obj
+ current_ptr = ""
+ token: Union[int, str, None] = None
+
+ for token in self.tokens:
+ if current is None:
+ raise ValueError(
+ f"JSON pointer cannot reference nested non-existent object: object at ptr '{current_ptr}' already points to None, cannot nest deeper with token '{token}'"
+ )
+
+ elif isinstance(current, (bool, int, float, str)):
+ raise ValueError(f"object at '{current_ptr}' is a scalar, JSON pointer cannot point into it")
+
+ else:
+ parent = current
+ if isinstance(current, list):
+ if token == "-":
+ current = None
+ else:
+ try:
+ token = int(token)
+ current = current[token]
+ except ValueError:
+ raise ValueError(
+ f"invalid JSON pointer: list '{current_ptr}' require numbers as keys, instead got '{token}'"
+ )
+
+ elif isinstance(current, dict):
+ current = current.get(token, None)
+
+ current_ptr += f"/{token}"
+
+ return parent, current, token
+
+
+def json_ptr_resolve(
+ obj: JSONPtrAddressable,
+ ptr: str,
+) -> Tuple[Optional[JSONPtrAddressable], Optional[JSONPtrAddressable], Union[str, int, None]]:
+ return _JSONPtr(ptr).resolve(obj)
-import base64
import json
from enum import Enum, auto
-from hashlib import blake2b
-from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
import yaml
-from typing_extensions import Literal
from yaml.constructor import ConstructorError
from yaml.nodes import MappingNode
-from knot_resolver_manager.utils.modeling.query import QueryTree
-
from .exceptions import DataParsingError
-
-
-class ParsedTree:
- """
- Simple wrapper for parsed data.
- Changes external naming convention (hyphen separator) to internal (snake_case) on the fly.
-
- IMMUTABLE, DO NOT MODIFY
- """
-
- @staticmethod
- def _convert_internal_field_name_to_external(name: Any) -> Any:
- if isinstance(name, str):
- return name.replace("_", "-")
- return name
-
- @staticmethod
- def _convert_external_field_name_to_internal(name: Any) -> Any:
- if isinstance(name, str):
- return name.replace("-", "_")
- return name
-
- def __init__(self, data: Union[Dict[str, Any], str, int, bool, List[Any]]):
- self._data = data
-
- def to_raw(self) -> Union[Dict[str, Any], str, int, bool, List[Any]]:
- return self._data
-
- def __getitem__(self, key: str) -> Any:
- assert isinstance(self._data, dict)
- return self._data[ParsedTree._convert_internal_field_name_to_external(key)]
-
- def is_dict(self) -> bool:
- return isinstance(self._data, dict)
-
- def type(self) -> Type[Any]:
- return type(self._data)
-
- def __contains__(self, key: str) -> bool:
- assert isinstance(self._data, dict)
- return ParsedTree._convert_internal_field_name_to_external(key) in self._data
-
- def __str__(self) -> str:
- return json.dumps(self._data, sort_keys=False, indent=2)
-
- def keys(self) -> Set[Any]:
- assert isinstance(self._data, dict)
- return {ParsedTree._convert_external_field_name_to_internal(key) for key in self._data.keys()}
-
- def query(
- self,
- op: Literal["get", "post", "delete", "patch", "put"],
- path: str,
- update_with: Optional["ParsedTree"] = None,
- ) -> "Tuple[ParsedTree, Optional[ParsedTree]]":
- new_root, res = QueryTree(self._data).query(
- op, path, update_with=QueryTree(update_with.to_raw()) if update_with is not None else None
- )
- return ParsedTree(new_root.to_raw()), ParsedTree(res.to_raw()) if res is not None else None
-
- @property
- def etag(self) -> str:
- m = blake2b(digest_size=15)
- m.update(json.dumps(self._data, sort_keys=True).encode("utf8"))
- return base64.urlsafe_b64encode(m.digest()).decode("utf8")
+from .renaming import renamed
# custom hook for 'json.loads()' to detect duplicate keys in data
YAML = auto()
JSON = auto()
- def parse_to_dict(self, text: str) -> ParsedTree:
+ def parse_to_dict(self, text: str) -> Any:
if self is _Format.YAML:
# RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
# https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
- return ParsedTree(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
+ return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
elif self is _Format.JSON:
- return ParsedTree(json.loads(text, object_pairs_hook=_json_raise_duplicates))
+ return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
else:
raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
def from_mime_type(mime_type: str) -> "_Format":
formats = {
"application/json": _Format.JSON,
+ "application/yaml": _Format.YAML,
"application/octet-stream": _Format.JSON, # default in aiohttp
"text/vnd.yaml": _Format.YAML,
}
if mime_type not in formats:
raise DataParsingError(
- f"unsupported MIME type '{mime_type}', expected 'application/json' or 'text/vnd.yaml'"
+ f"unsupported MIME type '{mime_type}', expected 'application/json' or 'application/yaml'"
)
return formats[mime_type]
-def parse(data: str, mime_type: str) -> ParsedTree:
+def parse(data: str, mime_type: str) -> Any:
return _Format.from_mime_type(mime_type).parse_to_dict(data)
-def parse_yaml(data: str) -> ParsedTree:
+def parse_yaml(data: str) -> Any:
return _Format.YAML.parse_to_dict(data)
-def parse_json(data: str) -> ParsedTree:
+def parse_json(data: str) -> Any:
return _Format.JSON.parse_to_dict(data)
import copy
-import json
-import re
-from typing import Any, Dict, List, Optional, Set, Tuple, Union
+from abc import ABC, abstractmethod
+from typing import Any, List, Optional, Tuple, Union
from typing_extensions import Literal
-from knot_resolver_manager.utils.modeling.exceptions import DataParsingError
+from knot_resolver_manager.utils.modeling.base_schema import NoRenameBaseSchema, load
+from knot_resolver_manager.utils.modeling.json_pointer import json_ptr_resolve
-class QueryTree:
- """
- Simple wrapper for raw data which allows modification queries to be run on top.
+class PatchError(Exception):
+ pass
- IMMUTABLE, DO NOT MODIFY
- """
- def is_scalar(self) -> bool:
+class Op(NoRenameBaseSchema, ABC):
+ @abstractmethod
+ def eval(self, fakeroot: Any) -> Any:
"""
- true if the object represents a primitive type
+ modifies the given fakeroot, returns a new one
"""
- return isinstance(self._data, (str, int, bool))
- def is_object(self) -> bool:
- """
- true if the object represents a list or dict
- """
- return isinstance(self._data, (list, dict))
+ def _resolve_ptr(self, fakeroot: Any, ptr: str) -> Tuple[Any, Any, Union[str, int, None]]:
+ # Lookup tree part based on the given JSON pointer
+ parent, obj, token = json_ptr_resolve(fakeroot["root"], ptr)
- def _is_list(self) -> bool:
- return isinstance(self._data, list)
+ # the lookup was on pure data, wrap the results in QueryTree
+ if parent is None:
+ parent = fakeroot
+ token = "root"
- def _is_dict(self) -> bool:
- return isinstance(self._data, dict)
+ assert token is not None
- def _upsert(self, key: str, value: "QueryTree") -> None:
- """
- WARNING!!! MUTATES THE TREE
+ return parent, obj, token
- update or insert
- """
- if isinstance(self._data, dict):
- self._data[key] = value.to_raw()
- elif isinstance(self._data, list):
- if key in self:
- self._data[int(key)] = value.to_raw()
+
+class AddOp(Op):
+ op: Literal["add"]
+ path: str
+ value: Any
+
+ def eval(self, fakeroot: Any) -> Any:
+ parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
+
+ if isinstance(parent, dict):
+ parent[token] = self.value
+ elif isinstance(parent, list):
+ if token == "-":
+ parent.append(self.value)
else:
- raise DataParsingError("query invalid: can't set a value of an item in a list at a non-existent index")
+ assert isinstance(token, int)
+ parent.insert(token, self.value)
else:
- assert False, "this should never happen"
+ assert False, "never happens"
- def _append(self, value: "QueryTree") -> None:
- """
- WARNING!!! MUTATES THE TREE
+ return fakeroot
- append to a list
- """
- assert isinstance(self._data, list)
- self._data.append(value.to_raw())
- def _delete(self, key: str) -> None:
- """
- WARNING!!! MUTATES THE TREE
+class RemoveOp(Op):
+ op: Literal["remove"]
+ path: str
- deletes a key
- """
- assert self.is_object()
- if isinstance(self._data, list):
- del self._data[int(key)]
- elif isinstance(self._data, dict):
- del self._data[key]
- else:
- assert False, "never happens"
+ def eval(self, fakeroot: Any) -> Any:
+ parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
+ del parent[token]
+ return fakeroot
- def value(self) -> Union[str, int, bool]:
- if self.is_object():
- raise DataParsingError("attempted to access object as a scalar")
- assert isinstance(self._data, (str, int, bool)) # make type checker happy
- return self._data
+class ReplaceOp(Op):
+ op: Literal["replace"]
+ path: str
+ value: str
- def __init__(self, data: Union[Dict[str, Any], str, int, bool, List[Any]]):
- self._data = data
+ def eval(self, fakeroot: Any) -> Any:
+ parent, obj, token = self._resolve_ptr(fakeroot, self.path)
- def to_raw(self) -> Union[Dict[str, Any], str, int, bool, List[Any]]:
- return self._data
+ if obj is None:
+ raise PatchError("the value you are trying to replace is null")
+ parent[token] = self.value
+ return fakeroot
- def __getitem__(self, key: Union[str, int]) -> "QueryTree":
- if self.is_scalar():
- raise DataParsingError(f"attempted to access scalar value '{self._data}' as an object type")
- if isinstance(self._data, list):
- return QueryTree(self._data[int(key)])
- elif isinstance(self._data, dict):
- return QueryTree(self._data[str(key)])
- else:
- raise RuntimeError("unexpected type in self._data, this should never happen")
+class MoveOp(Op):
+ op: Literal["move"]
+ source: str
+ path: str
- def __contains__(self, key: Union[str, int]) -> bool:
- if self.is_scalar():
- raise DataParsingError(f"attempted to access scalar value '{self._data}' as an object type")
+ def _source(self, source):
+ if "from" not in source:
+ raise ValueError("missing property 'from' in 'move' JSON patch operation")
+ return str(source["from"])
- if isinstance(self._data, list):
- return int(key) < len(self._data)
- elif isinstance(self._data, dict):
- return key in self._data
- else:
- raise RuntimeError("unexpected type in self._data, this should never happen")
+ def eval(self, fakeroot: Any) -> Any:
+ if self.path.startswith(self.source):
+ raise PatchError("can't move value into itself")
- def __str__(self) -> str:
- return json.dumps(self._data, sort_keys=False, indent=2)
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
+ newobj = copy.deepcopy(obj)
- def keys(self) -> Set[Any]:
- if self.is_scalar():
- raise DataParsingError(f"attempted to access scalar value '{self._data}' as an object type")
+ fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot)
+ fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
+ return fakeroot
- if isinstance(self._data, dict):
- return set(self._data.keys())
- elif isinstance(self._data, list):
- return set(range(len(self._data)))
- else:
- raise RuntimeError("unexpected type in self._data, this should never happen")
- _SUBTREE_MUTATION_PATH_PATTERN = re.compile(r"^(/[^/]+)*/?$")
+class CopyOp(Op):
+ op: Literal["copy"]
+ source: str
+ path: str
- def _preprocess_query_path(self, path: str) -> str:
- # prepare and validate the path object
- path = path[:-1] if path.endswith("/") else path
- if re.match(QueryTree._SUBTREE_MUTATION_PATH_PATTERN, path) is None:
- raise DataParsingError("Provided object path for mutation is invalid.")
- if "_" in path:
- raise DataParsingError("Provided object path contains character '_', which is illegal")
+ def _source(self, source):
+ if "from" not in source:
+ raise ValueError("missing property 'from' in 'copy' JSON patch operation")
+ return str(source["from"])
- # now, the path variable should contain '/' separated field names
- return path.strip("/")
+ def eval(self, fakeroot: Any) -> Any:
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
+ newobj = copy.deepcopy(obj)
- def _copy_and_find(self, path: str) -> Tuple["QueryTree", "QueryTree", Optional["QueryTree"], str]:
- """
- Returns (fakeroot, parent, Optional[queryTarget])
+ fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
+ return fakeroot
- - fakeroot has the real root in a field called 'root'
- - queryTarget is None, when it refers to non-existent object
- """
- path = self._preprocess_query_path(path)
-
- # `self` is considered immutable, do all operations on a copy
- rwcopy = copy.deepcopy(self)
- # make a fake root, so that we do not have to handle special cases for root node
- rwcopy._data = {"root": rwcopy._data} # pylint: disable=protected-access
- segments = f"root/{path}".strip("/").split("/")
-
- # walk the tree
- obj: Optional[QueryTree] = rwcopy
- parent: QueryTree = rwcopy
- segment = "" # just to make type checker happy
- for segment in segments:
- assert len(segment) > 0
- if obj is None:
- raise DataParsingError(
- f"query path does not point to any existing object in the configuration tree, first missing path segment is called '{segment}'"
- )
- elif segment in obj:
- parent = obj
- obj = obj[segment]
- else:
- parent = obj
- obj = None
-
- return rwcopy, parent, obj, segment
-
- @staticmethod
- def _post(
- fakeroot: "QueryTree",
- parent: "QueryTree",
- obj: Optional["QueryTree"],
- name: str,
- update_with: Optional["QueryTree"] = None,
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- # pylint: disable=protected-access
- if update_with is None:
- raise DataParsingError("query invalid: can't request a change via POST and not provide a value")
- if parent._is_dict():
- parent._upsert(name, update_with)
- return fakeroot["root"], None
- elif parent._is_list():
- if obj is None:
- parent._append(update_with)
- return fakeroot["root"], None
- else:
- parent._upsert(name, update_with)
- return fakeroot["root"], None
- else:
- assert False, "this should never happen"
-
- @staticmethod
- def _patch(
- fakeroot: "QueryTree",
- parent: "QueryTree",
- obj: Optional["QueryTree"],
- name: str,
- update_with: Optional["QueryTree"] = None,
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- # pylint: disable=protected-access
- if update_with is None:
- raise DataParsingError("query invalid: can't request a change via PATCH and not provide a value")
- if obj is None:
- raise DataParsingError("query error: can't update non-existent object")
- else:
- parent._upsert(name, update_with)
- return fakeroot["root"], None
-
- @staticmethod
- def _put(
- fakeroot: "QueryTree",
- parent: "QueryTree",
- obj: Optional["QueryTree"],
- name: str,
- update_with: Optional["QueryTree"] = None,
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- # pylint: disable=protected-access
- if update_with is None:
- raise DataParsingError("query invalid: can't request an insert via PUT and not provide a value")
- if obj is None:
- # FIXME probably a bug, this is weird
- if parent._is_list():
- parent._append(update_with)
- return fakeroot["root"], None
- elif parent._is_dict():
- parent._upsert(name, update_with)
- return fakeroot["root"], None
- else:
- assert False, "never happens"
- else:
- raise DataParsingError("query invalid: can't insert when there is already a value there")
+class TestOp(Op):
+ op: Literal["test"]
+ path: str
+ value: Any
- def query(
- self, op: Literal["get", "post", "delete", "patch", "put"], path: str, update_with: Optional["QueryTree"] = None
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- """
- Implements a modification API in the style of Caddy:
- https://caddyserver.com/docs/api
- """
- # pylint: disable=protected-access
- fakeroot, parent, obj, name = self._copy_and_find(path)
+ def eval(self, fakeroot: Any) -> Any:
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.path)
- # get = return what the path selector picks
- if op == "get":
- return fakeroot["root"], obj
+ if obj != self.value:
+ raise PatchError("test failed")
- # post = set value at a key, append to lists
- elif op == "post":
- return self._post(fakeroot, parent, obj, name, update_with)
+ return fakeroot
- # delete = remove the given key
- elif op == "delete":
- parent._delete(name)
- return fakeroot["root"], None
- # patch = update an existing object
- elif op == "patch":
- return self._patch(fakeroot, parent, obj, name, update_with)
+def query(
+ original: Any, method: Literal["get", "delete", "put", "patch"], ptr: str, payload: Any
+) -> Tuple[Any, Optional[Any]]:
+ """
+ Implements a modification API in the style of Caddy:
+ https://caddyserver.com/docs/api
+ """
+
+ ########################################
+ # Prepare data we will be working on
+
+ # First of all, we consider the original data to be immutable. So we need to make a copy
+ # in order to freely mutate them
+ dataroot = copy.deepcopy(original)
+
+ # To simplify referencing the root, create a fake root node
+ fakeroot = {"root": dataroot}
+
+ #########################################
+ # Handle the actual requested operation
- # put = insert and never replace
- elif op == "put":
- return self._put(fakeroot, parent, obj, name, update_with)
+ # get = return what the path selector picks
+ if method == "get":
+ parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
+ return fakeroot["root"], obj
+ elif method == "delete":
+ fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot)
+ return fakeroot["root"], None
+
+ elif method == "put":
+ parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
+ assert parent is not None # we know this due to the fakeroot
+ if isinstance(parent, list) and token == "-":
+ parent.append(payload)
else:
- assert False, "invalid operation"
+ parent[token] = payload
+ return fakeroot["root"], None
+
+ elif method == "patch":
+ tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]]
+ transaction: tp = load(tp, payload)
+
+ for i, op in enumerate(transaction):
+ try:
+ fakeroot = op.eval(fakeroot)
+ except PatchError as e:
+ raise ValueError(f"json patch transaction failed on step {i}") from e
+
+ return fakeroot["root"], None
+
+ else:
+ assert False, "invalid operation, never happens"
--- /dev/null
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, TypeVar
+
+
+class Renamed(ABC):
+ @abstractmethod
+ def original(self) -> Any:
+ """
+ Returns a data structure, which is the source without dynamic renamings
+ """
+
+ @staticmethod
+ def map_public_to_private(name: Any) -> Any:
+ if isinstance(name, str):
+ return name.replace("_", "-")
+ return name
+
+ @staticmethod
+ def map_private_to_public(name: Any) -> Any:
+ if isinstance(name, str):
+ return name.replace("-", "_")
+ return name
+
+
+K = TypeVar("K")
+V = TypeVar("V")
+
+
+class RenamedDict(Dict[K, V], Renamed):
+ def keys(self) -> Any:
+ keys = super().keys()
+ return {Renamed.map_private_to_public(key) for key in keys}
+
+ def __getitem__(self, key: K) -> V:
+ key = Renamed.map_public_to_private(key)
+ res = super().__getitem__(key)
+ return renamed(res)
+
+ def __setitem__(self, key: K, value: V) -> None:
+ key = Renamed.map_public_to_private(key)
+ return super().__setitem__(key, value)
+
+ def __contains__(self, key: object) -> bool:
+ key = Renamed.map_public_to_private(key)
+ return super().__contains__(key)
+
+ def items(self) -> Any:
+ for k, v in super().items():
+ yield Renamed.map_private_to_public(k), renamed(v)
+
+ def original(self) -> Dict[K, V]:
+ return dict(super().items())
+
+
+class RenamedList(List[V], Renamed): # type: ignore
+ def __getitem__(self, key: Any) -> Any:
+ res = super().__getitem__(key)
+ return renamed(res)
+
+ def original(self) -> Any:
+ return list(super().__iter__())
+
+
+def renamed(obj: Any) -> Any:
+ if isinstance(obj, dict):
+ return RenamedDict(**obj)
+ elif isinstance(obj, list):
+ return RenamedList(obj)
+ else:
+ return obj
--- /dev/null
+from pyparsing import empty
+
+from knot_resolver_manager.utils.etag import structural_etag
+
+
+def test_etag():
+ empty1 = {}
+ empty2 = {}
+
+ assert structural_etag(empty1) == structural_etag(empty2)
+
+ something1 = {"something": 1}
+ something2 = {"something": 2}
+ assert structural_etag(empty1) != structural_etag(something1)
+ assert structural_etag(something1) != structural_etag(something2)
--- /dev/null
+from pytest import raises
+
+from knot_resolver_manager.utils.modeling.json_pointer import json_ptr_resolve
+
+# example adopted from https://www.sitepoint.com/json-server-example/
+TEST = {
+ "clients": [
+ {
+ "id": "59761c23b30d971669fb42ff",
+ "isActive": True,
+ "age": 36,
+ "name": "Dunlap Hubbard",
+ "gender": "male",
+ "company": "CEDWARD",
+ "email": "dunlaphubbard@cedward.com",
+ "phone": "+1 (890) 543-2508",
+ "address": "169 Rutledge Street, Konterra, Northern Mariana Islands, 8551",
+ },
+ {
+ "id": "59761c233d8d0f92a6b0570d",
+ "isActive": True,
+ "age": 24,
+ "name": "Kirsten Sellers",
+ "gender": "female",
+ "company": "EMERGENT",
+ "email": "kirstensellers@emergent.com",
+ "phone": "+1 (831) 564-2190",
+ "address": "886 Gallatin Place, Fannett, Arkansas, 4656",
+ },
+ {
+ "id": "59761c23fcb6254b1a06dad5",
+ "isActive": True,
+ "age": 30,
+ "name": "Acosta Robbins",
+ "gender": "male",
+ "company": "ORGANICA",
+ "email": "acostarobbins@organica.com",
+ "phone": "+1 (882) 441-3367",
+ "address": "697 Linden Boulevard, Sattley, Idaho, 1035",
+ },
+ ]
+}
+
+
+def test_json_ptr():
+ parent, res, token = json_ptr_resolve(TEST, "")
+ assert parent is None
+ assert res is TEST
+
+ parent, res, token = json_ptr_resolve(TEST, "/")
+ assert parent is TEST
+ assert res is None
+ assert token == ""
+
+ parent, res, token = json_ptr_resolve(TEST, "/clients/2/gender")
+ assert parent is TEST["clients"][2]
+ assert res == "male"
+ assert token == "gender"
+
+ with raises(ValueError):
+ _ = json_ptr_resolve(TEST, "//")
+
+ with raises(SyntaxError):
+ _ = json_ptr_resolve(TEST, "invalid/ptr")
+
+ with raises(ValueError):
+ _ = json_ptr_resolve(TEST, "/clients/2/gender/invalid")
+
+ parent, res, token = json_ptr_resolve(TEST, "/~01")
+ assert parent is TEST
+ assert res is None
+ assert token == "~1"
+++ /dev/null
-from pyparsing import empty
-
-from knot_resolver_manager.utils.modeling import ParsedTree
-
-
-def test_etag():
- empty1 = ParsedTree({})
- empty2 = ParsedTree({})
-
- assert empty1.etag == empty2.etag
-
- something1 = ParsedTree({"something": 1})
- something2 = ParsedTree({"something": 2})
- assert empty1.etag != something1.etag
- assert something1.etag != something2.etag
-from typing import List, Optional
-
from pytest import raises
-from knot_resolver_manager.utils.modeling.base_schema import BaseSchema
-from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
-from knot_resolver_manager.utils.modeling.parsing import parse_json, parse_yaml
-
-
-class InnerSchema(BaseSchema):
- size: int = 5
- lst: Optional[List[int]]
-
-
-class ConfSchema(BaseSchema):
- workers: int
- lua_config: Optional[str]
- inner: InnerSchema = InnerSchema()
-
- def _validate(self) -> None:
- super()._validate()
- if self.workers < 0:
- raise DataValidationError("ee", "/workers")
-
-
-YAML = """
-workers: 1
-lua-config: something
-"""
-REF = parse_yaml(YAML)
-
-
-def test_patch():
- o = ConfSchema(REF)
- assert o.lua_config == "something"
- assert o.workers == 1
- assert o.inner.size == 5
-
- # replacement of 'lua-config' attribute
- upd, _resp = REF.query("patch", "/lua-config", parse_json('"new_value"'))
- o = ConfSchema(upd)
- assert o.lua_config == "new_value"
- assert o.inner.size == 5
- assert o.workers == 1
-
- # replacement of the whole tree
- upd, _resp = REF.query("patch", "/", parse_json('{"inner": {"size": 55}, "workers": 8}'))
- o = ConfSchema(upd)
- assert o.lua_config is None
- assert o.workers == 8
- assert o.inner.size == 55
-
- # raise validation DataValidationError
- with raises(DataValidationError):
- upd, _resp = REF.query("patch", "/", parse_json('{"workers": -5}'))
- o = ConfSchema(upd)
-
+from knot_resolver_manager.utils.modeling.query import query
-def test_put_and_delete():
- # insert 'inner' subtree
- upd, _resp = REF.query("put", "/inner", parse_json('{"size": 33}'))
- o = ConfSchema(upd)
- assert o.lua_config == "something"
- assert o.workers == 1
- assert o.inner.size == 33
- upd, _resp = upd.query("put", "/inner/lst", parse_json("[1,2,3]"))
- o = ConfSchema(upd)
- assert tuple(o.inner.lst or []) == tuple([1, 2, 3])
+def test_example_from_spec():
+ # source of the example: https://jsonpatch.com/
+ original = {"baz": "qux", "foo": "bar"}
+ patch = [
+ {"op": "replace", "path": "/baz", "value": "boo"},
+ {"op": "add", "path": "/hello", "value": ["world"]},
+ {"op": "remove", "path": "/foo"},
+ ]
+ expected = {"baz": "boo", "hello": ["world"]}
- upd, _resp = upd.query("delete", "/inner/lst/1")
- o = ConfSchema(upd)
- assert tuple(o.inner.lst or []) == tuple([1, 3])
+ result, _ = query(original, "patch", "", patch)
- upd, _resp = upd.query("delete", "/inner/lst")
- o = ConfSchema(upd)
- assert o.inner.lst is None
+ assert result == expected
--- /dev/null
+from knot_resolver_manager.utils.modeling.renaming import renamed
+
+
+def test_all():
+ ref = {
+ "awesome-customers": [{"name": "John", "home-address": "London"}, {"name": "Bob", "home-address": "Prague"}],
+ "storage": {"bobby-pin": 5, "can-opener": 0, "laptop": 1},
+ }
+
+ rnm = renamed(ref)
+ assert rnm["awesome_customers"][0]["home_address"] == "London"
+ assert rnm["awesome_customers"][1:][0]["home_address"] == "Prague"
+ assert set(rnm["storage"].items()) == set((("can_opener", 0), ("bobby_pin", 5), ("laptop", 1)))
+ assert set(rnm["storage"].keys()) == set(("bobby_pin", "can_opener", "laptop"))
+
+
+def test_nested_init():
+ val = renamed(renamed(({"ke-y": "val-ue"})))
+ assert val["ke_y"] == "val-ue"
+
+
+def test_original():
+ obj = renamed(({"ke-y": "val-ue"})).original()
+ assert obj["ke-y"] == "val-ue"