)
-TSource = Union[NoneType, Dict[Any, Any], ParsedTree, "SchemaNode"]
+TSource = Union[NoneType, ParsedTree, "SchemaNode"]
class SchemaNode:
if self._PREVIOUS_SCHEMA is not None:
source = self._PREVIOUS_SCHEMA(source, object_path=object_path) # pylint: disable=not-callable
+ # make sure that all raw data checks passed on the source object
+ if isinstance(source, dict):
+ source = ParsedTree(source)
+
cls = self.__class__
annot = cls.__dict__.get("__annotations__", {})
if is_internal_field_name(name):
continue
- # convert naming (used when converting from json/yaml)
- source_name = name.replace("_", "-") if isinstance(source, dict) else name
-
# populate field
if not source:
val = None
# we have a way how to create the value
elif hasattr(self, f"_{name}"):
val = self._get_converted_value(name, source, object_path)
- used_keys.add(source_name) # the field might not exist, but that won't break anything
+ used_keys.add(name) # the field might not exist, but that won't break anything
# source just contains the value
- elif source_name in source:
- val = source[source_name]
- used_keys.add(source_name)
+ elif name in source:
+ val = source[name]
+ used_keys.add(name)
# there is a default value and in the source, the value is missing
elif getattr(self, name, ...) is not ...:
val = None
val = None
# we expected a value but it was not there
else:
- raise SchemaException(f"Missing attribute '{source_name}'.", object_path)
+ raise SchemaException(f"Missing attribute '{name}'.", object_path)
use_default = hasattr(cls, name)
default = getattr(cls, name, ...)
setattr(self, name, value)
# check for unused keys in case the
- if source and isinstance(source, dict):
+ if source and not isinstance(source, SchemaNode):
unused = source.keys() - used_keys
if len(unused) > 0:
raise SchemaException(
import json
import re
from enum import Enum, auto
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, KeysView, List, Optional, Tuple, Union, cast
import yaml
from yaml.constructor import ConstructorError
IMMUTABLE, DO NOT MODIFY
"""
- def __init__(self, dct: Dict[str, Any]):
- self.data = dct
-
- def to_dict(self) -> Dict[str, Any]:
+ @staticmethod
+ def _convert_to_underscores(dct: Dict[Any, Any]) -> Dict[Any, Any]:
+ assert isinstance(dct, dict)
+ res: Dict[Any, Any] = {}
+ for key in dct:
+ assert isinstance(key, str)
+
+ # rename & convert recursively
+ obj = dct[key]
+ if isinstance(obj, dict):
+ obj = ParsedTree._convert_to_underscores(cast(Dict[Any, Any], obj))
+ res[key.replace("-", "_")] = obj
+
+ return res
+
+ def __init__(self, data: Union[Dict[str, Any], str, int, bool]):
+ if isinstance(data, dict):
+ data = ParsedTree._convert_to_underscores(data)
+ self.data = data
+
+ def to_raw(self) -> Union[Dict[str, Any], str, int, bool]:
return self.data
def __getitem__(self, key: str):
+ assert isinstance(self.data, dict)
return self.data[key]
def __contains__(self, key: str):
+ assert isinstance(self.data, dict)
return key in self.data
+ def __str__(self) -> str:
+ return json.dumps(self.data, sort_keys=False, indent=2)
+
+ def keys(self) -> KeysView[Any]:
+ assert isinstance(self.data, dict)
+ return self.data.keys()
+
_SUBTREE_MUTATION_PATH_PATTERN = re.compile(r"^(/[^/]+)*/?$")
- def update(self, document_path: str, data: "ParsedTree") -> "ParsedTree":
+ def update(self, path: str, data: "ParsedTree") -> "ParsedTree":
# prepare and validate the path object
- path = document_path[:-1] if document_path.endswith("/") else document_path
+ path = path[:-1] if path.endswith("/") else path
if re.match(ParsedTree._SUBTREE_MUTATION_PATH_PATTERN, path) is None:
raise ParsingException("Provided object path for mutation is invalid.")
+ if "_" in path:
+ raise ParsingException("Provided object path contains character '_', which is illegal")
+ path = path.replace("-", "_")
path = path[1:] if path.startswith("/") else path
# now, the path variable should contain '/' separated field names
return data
# find the subtree we will replace in a copy of the original object
- to_mutate = copy.deepcopy(self.to_dict())
+ to_mutate = copy.deepcopy(self.to_raw())
obj = to_mutate
parent = None
- for dash_segment in path.split("/"):
- segment = dash_segment.replace("-", "_")
+ for segment in path.split("/"):
+ assert isinstance(obj, dict)
if segment == "":
raise ParsingException(f"Unexpectedly empty segment in path '{path}'")
)
elif segment in obj:
parent = obj
- obj = getattr(parent, segment)
+ obj = obj[segment]
elif segment not in obj:
parent = obj
obj = {}
assert parent is not None
# assign the subtree
- last_name = path.split("/")[-1].replace("-", "_")
- parent[last_name] = data.to_dict()
+ last_name = path.split("/")[-1]
+ parent[last_name] = data.to_raw()
return ParsedTree(to_mutate)