import json
-from typing import Any, Type, TypeVar
+from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
import yaml
+from yaml.constructor import ConstructorError
+from yaml.nodes import MappingNode
from knot_resolver_manager.utils.types import (
get_generic_type_argument,
from ..compat.dataclasses import is_dataclass
-
class ValidationException(Exception):
pass
)
+# custom hook for 'json.loads()' to detect duplicate keys in data
+# source: https://stackoverflow.com/questions/14902299/json-loads-allows-duplicate-keys-in-a-dictionary-overwriting-the-first-value
+def json_raise_duplicates(pairs: List[Tuple[Any, Any]]) -> Optional[Any]:
+ dict_out: Dict[Any,Any] = {}
+ for key, val in pairs:
+ if key in dict_out:
+ raise ValidationException(f"duplicate key detected: {key}")
+ else:
+ dict_out[key] = val
+ return dict_out
+
+
+# custom loader for 'yaml.load()' to detect duplicate keys in data
+# source: https://gist.github.com/pypt/94d747fe5180851196eb
+class RaiseDuplicatesLoader(yaml.SafeLoader):
+ def construct_mapping(self: yaml.SafeLoader, node: Union[MappingNode, Any], deep: bool=False) -> Dict[Any,Any]:
+ if not isinstance(node, MappingNode):
+ raise ConstructorError(None, None,
+ "expected a mapping node, but found %s" % node.id,
+ node.start_mark)
+ mapping: Dict[Any,Any] = {}
+ for key_node, value_node in node.value:
+ key = self.construct_object(key_node, deep=deep) # type: ignore
+ try:
+ hash(key) # type: ignore
+ except TypeError as exc:
+ raise ConstructorError("while constructing a mapping", node.start_mark,
+ "found unacceptable key (%s)" % exc, key_node.start_mark)
+ # check for duplicate keys
+ if key in mapping:
+ raise ValidationException(f"duplicate key detected: {key_node.start_mark}")
+ value = self.construct_object(value_node, deep=deep) # type: ignore
+ mapping[key] = value
+ return mapping
+
+
_T = TypeVar("_T", bound="DataclassParserValidatorMixin")
@classmethod
def from_yaml(cls: Type[_T], text: str, default: _T = ..., use_default: bool = False) -> _T:
- data = yaml.safe_load(text)
+ data = yaml.load(text, Loader=RaiseDuplicatesLoader) # type: ignore
config: _T = _from_dictlike_obj(cls, data, default, use_default)
config.validate()
return config
@classmethod
def from_json(cls: Type[_T], text: str, default: _T = ..., use_default: bool = False) -> _T:
- data = json.loads(text)
+ data = json.loads(text, object_pairs_hook=json_raise_duplicates)
config: _T = _from_dictlike_obj(cls, data, default, use_default)
config.validate()
return config