]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
utils: detect duplicate keys in configuration
authorAleš <ales.mrazek@nic.cz>
Mon, 3 May 2021 07:19:14 +0000 (09:19 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:52 +0000 (16:17 +0200)
- custom hook for json loader
- custom loader for PyYAML

manager/knot_resolver_manager/utils/dataclasses_parservalidator.py

index 13f76bf47bd02e5d747ce5098727260f8e7ee5f2..6d893f2e08a85438f6c4afb204fed59d0c65fed3 100644 (file)
@@ -1,7 +1,9 @@
 import json
-from typing import Any, Type, TypeVar
+from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
 
 import yaml
+from yaml.constructor import ConstructorError
+from yaml.nodes import MappingNode
 
 from knot_resolver_manager.utils.types import (
     get_generic_type_argument,
@@ -16,7 +18,6 @@ from knot_resolver_manager.utils.types import (
 
 from ..compat.dataclasses import is_dataclass
 
-
 class ValidationException(Exception):
     pass
 
@@ -144,6 +145,42 @@ def _from_dictlike_obj(cls: Any, obj: Any, default: Any, use_default: bool) -> A
         )
 
 
+# custom hook for 'json.loads()' to detect duplicate keys in data
+# source: https://stackoverflow.com/questions/14902299/json-loads-allows-duplicate-keys-in-a-dictionary-overwriting-the-first-value
+def json_raise_duplicates(pairs: List[Tuple[Any, Any]]) -> Optional[Any]:
+    dict_out: Dict[Any,Any] = {}
+    for key, val in pairs:
+        if key in dict_out:
+            raise ValidationException(f"duplicate key detected: {key}")
+        else:
+            dict_out[key] = val
+    return dict_out
+
+
+# custom loader for 'yaml.load()' to detect duplicate keys in data
+# source: https://gist.github.com/pypt/94d747fe5180851196eb
+class RaiseDuplicatesLoader(yaml.SafeLoader):
+    def construct_mapping(self: yaml.SafeLoader, node: Union[MappingNode, Any], deep: bool=False) -> Dict[Any,Any]:
+        if not isinstance(node, MappingNode):
+            raise ConstructorError(None, None,
+                    "expected a mapping node, but found %s" % node.id,
+                    node.start_mark)
+        mapping: Dict[Any,Any] = {}
+        for key_node, value_node in node.value:
+            key = self.construct_object(key_node, deep=deep)    # type: ignore
+            try:
+                hash(key)   # type: ignore
+            except TypeError as exc:
+                raise ConstructorError("while constructing a mapping", node.start_mark,
+                       "found unacceptable key (%s)" % exc, key_node.start_mark)
+            # check for duplicate keys
+            if key in mapping:
+                raise ValidationException(f"duplicate key detected: {key_node.start_mark}")
+            value = self.construct_object(value_node, deep=deep)    # type: ignore
+            mapping[key] = value
+        return mapping
+
+
 _T = TypeVar("_T", bound="DataclassParserValidatorMixin")
 
 
@@ -169,14 +206,14 @@ class DataclassParserValidatorMixin:
 
     @classmethod
     def from_yaml(cls: Type[_T], text: str, default: _T = ..., use_default: bool = False) -> _T:
-        data = yaml.safe_load(text)
+        data = yaml.load(text, Loader=RaiseDuplicatesLoader)    # type: ignore
         config: _T = _from_dictlike_obj(cls, data, default, use_default)
         config.validate()
         return config
 
     @classmethod
     def from_json(cls: Type[_T], text: str, default: _T = ..., use_default: bool = False) -> _T:
-        data = json.loads(text)
+        data = json.loads(text, object_pairs_hook=json_raise_duplicates)
         config: _T = _from_dictlike_obj(cls, data, default, use_default)
         config.validate()
         return config