import json
from enum import Enum, auto
-from typing import TYPE_CHECKING, Any
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Dict, Hashable, List, Union
import yaml
from yaml.constructor import ConstructorError
-from knot_resolver.utils.modeling.errors import DataParsingError
+from knot_resolver.utils.modeling.errors import DataParsingError, DataReadingError, DataTypeError
if TYPE_CHECKING:
from yaml.nodes import MappingNode
+_YAML_INCLUDE_KEY = "include"
+_YAML_INCLUDE_TAG = "!include"
-def _json_raise_duplicates(pairs: list[tuple[Any, Any]]) -> dict[Any, Any]:
- """
- JSON hook used in 'json.loads()' that detects duplicate keys in the parsed data.
- The code for this hook was highly inspired by: https://stackoverflow.com/q/14902299/12858520
+class ParsedDataWrapper:
+ """A wrapper for included files and their data.
+
+ Attributes:
+ data (ParsedData): Data that has been read and parsed from the file.
+ file (str | Path): The path to the file containing the data.
+
"""
- mapping: dict[Any, Any] = {}
- for key, value in pairs:
- if key in mapping:
- msg = f"duplicate key detected: {key}"
- raise DataParsingError(msg)
- mapping[key] = value
- return mapping
+ def __init__(self, data: ParsedData, file: str | Path):
+ self.data = data
+ self.file = Path(file)
+
+
+ParsedData = Union[Dict[str, "ParsedData"], List["ParsedData"], ParsedDataWrapper, str, int, float, bool, None]
-class _YAMLRaiseDuplicatesLoader(yaml.SafeLoader):
+
+def _yaml_include_constructor(self: _YAMLRaiseDuplicatesIncludeLoader, node: MappingNode) -> ParsedDataWrapper:
+ """Construct include wrapper for detected '!include' keys.
+
+ The code for this constructor was highly inspired by:
+ https://gist.github.com/joshbode/569627ced3076931b02f
"""
- YAML loader used in 'yaml.loads()' that detects duplicate keys in the parsed data.
+ file_path = Path(self.construct_scalar(node))
+ if not file_path.is_absolute() and self.stream_path:
+ file_path = self.stream_path.parent / file_path
+ return try_to_parse_file(file_path)
+
+
+class _YAMLRaiseDuplicatesIncludeLoader(yaml.SafeLoader):
+ """Custom YAML loader used in 'yaml.loads()'.
+
+ The loader detects duplicate keys in the parsed data.
+ It also detects '!include' keys and loads data from included files.
The code for this loader was highly inspired by: https://gist.github.com/pypt/94d747fe5180851196eb
The loader extends yaml.SafeLoader, so it should be safe, even though the linter reports unsafe-yaml-load (S506).
More about safe loader: https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
"""
- def construct_mapping(self, node: MappingNode, deep: bool = False) -> dict[Any, Any]:
- mapping: dict[Any, Any] = {}
+ def __init__(self, stream: str, stream_path: str | Path | None = None) -> None:
+ self.stream_path = Path(stream_path) if stream_path else None
+ self.add_constructor(_YAML_INCLUDE_TAG, _yaml_include_constructor)
+ super().__init__(stream)
+
+ def construct_mapping(self, node: MappingNode, deep: bool = False) -> dict[Hashable, Any]:
+ mapping: dict[Hashable, Any] = {}
for key_node, value_node in node.value:
key = self.construct_object(key_node, deep=deep)
# we need to check, that the key object can be used in a hash table
return mapping
+def _json_raise_duplicates(pairs: list[tuple[str, ParsedData]]) -> dict[str, ParsedData]:
+ """JSON hook used in 'json.loads()' that detects duplicate keys in the parsed data.
+
+ The code for this hook was highly inspired by: https://stackoverflow.com/q/14902299/12858520
+ """
+ mapping: dict[str, ParsedData] = {}
+ for key, value in pairs:
+ if key in mapping:
+ msg = f"duplicate key detected: {key}"
+ raise DataParsingError(msg)
+ mapping[key] = value
+ return mapping
+
+
+def _include_key_root(parsed_data: ParsedDataWrapper) -> ParsedDataWrapper:
+ data = parsed_data.data
+ base_path = parsed_data.file.parent
+
+ if isinstance(data, ParsedDataWrapper):
+ parsed_data.data = _include_key_root(data)
+
+ elif isinstance(data, dict) and _YAML_INCLUDE_KEY in data:
+ files = data[_YAML_INCLUDE_KEY]
+ parsed_files: list[ParsedData] = []
+
+ if isinstance(files, str):
+ file_path = Path(files)
+ if not file_path.is_absolute():
+ file_path = base_path / file_path
+ parsed_files.append(try_to_parse_file(file_path))
+
+ elif isinstance(files, list):
+ for file in files:
+ if isinstance(file, str):
+ file_path = Path(file)
+ if not file_path.is_absolute():
+ file_path = base_path / file_path
+ parsed_files.append(try_to_parse_file(file_path))
+ else:
+ msg = ""
+ pointer = f"{parsed_data.file}:/{_YAML_INCLUDE_KEY}"
+ raise DataTypeError(msg, pointer)
+
+ else:
+ msg = f"expected string or list, got {type(files)}"
+ pointer = f"{parsed_data.file}:/{_YAML_INCLUDE_KEY}"
+ raise DataTypeError(msg, pointer)
+
+ data[_YAML_INCLUDE_KEY] = parsed_files
+
+ return parsed_data
+
+
class DataFormat(Enum):
YAML = auto()
JSON = auto()
- def loads(self, text: str) -> dict[Any, Any]:
+ def load_file(self, file: str | Path) -> ParsedData:
+ """Read and parse data from file in data format and return the data in dictionary."""
+ file_path = Path(file)
+ text = file_path.read_text()
+ if self is DataFormat.YAML:
+ loader = _YAMLRaiseDuplicatesIncludeLoader(text, file)
+ try:
+ return loader.get_single_data()
+ finally:
+ loader.dispose()
+ return self.load_str(text)
+
+ def load_str(self, text: str) -> ParsedData:
"""Load data from string in data format and return the data in dictionary."""
if self is DataFormat.YAML:
- return yaml.load(text, Loader=_YAMLRaiseDuplicatesLoader) # noqa: S506
+ return yaml.load(text, Loader=_YAMLRaiseDuplicatesIncludeLoader) # noqa: S506
if self is DataFormat.JSON:
return json.loads(text, object_pairs_hook=_json_raise_duplicates)
msg = f"parsing data from '{self}' format is not implemented"
raise NotImplementedError(msg)
- def dumps(self, data: dict[Any, Any], indent: int | None = None) -> str:
- """Dump dictionary data to string in required data format."""
- if self is DataFormat.YAML:
- return yaml.safe_dump(data, indent=indent)
- if self is DataFormat.JSON:
- return json.dumps(data, indent=indent)
- msg = f"exporting data to '{self}' format is not implemented"
- raise NotImplementedError(msg)
+ # def dump_str(self, data: ParsedData, indent: int | None = None) -> str:
+ # """Dump the parsed(dict) data into a string in the required format."""
+ # if self is DataFormat.YAML:
+ # return yaml.safe_dump(data, indent=indent)
+ # if self is DataFormat.JSON:
+ # return json.dumps(data, indent=indent)
+ # msg = f"exporting data to '{self}' format is not implemented"
+ # raise NotImplementedError(msg)
+
+
+def parse_json_str(data: str) -> ParsedData:
+ """Parse the JSON string, and return its parsed(dict) data."""
+ return DataFormat.JSON.load_str(data)
-def parse_yaml(data: str) -> dict[Any, Any]:
- """Parse YAML string and return the data in dictionary."""
- return DataFormat.YAML.loads(data)
+def parse_json_file(file: str | Path) -> ParsedDataWrapper:
+ """Read the JSON file, parse its data string, and return its parsed(dict) data."""
+ data = DataFormat.JSON.load_file(file)
+ return ParsedDataWrapper(data, file)
-def parse_json(data: str) -> dict[Any, Any]:
- """Parse JSON string and return the data in dictionary."""
- return DataFormat.JSON.loads(data)
+def parse_yaml_file(file: str | Path) -> ParsedDataWrapper:
+ """Read the YAML file, parse its data string, and return its parsed(dict) data."""
+ data = DataFormat.YAML.load_file(file)
+ return _include_key_root(ParsedDataWrapper(data, file))
-def try_to_parse(data: str) -> dict[Any, Any]:
- """Attempt to parse data string as a JSON or YAML and return it's dictionary."""
+def try_to_parse_file(file: str | Path) -> ParsedDataWrapper:
+ """Attempt to read the file and parse its data string as JSON or YAML, then return its parsed(dict) data."""
try:
- return parse_json(data)
+ return parse_json_file(file)
+ except OSError as e:
+ raise DataReadingError(str(e), str(file)) from e
except json.JSONDecodeError:
try:
- return parse_yaml(data)
+ return parse_yaml_file(file)
except yaml.YAMLError as e:
# YAML parsing error should be sufficient because the JSON can be parsed by the YAML parser.
# We should receive a helpful error message for JSON as well.
- raise DataParsingError(e) from e
+ raise DataParsingError(str(e), str(file)) from e
+from pathlib import Path
+
import pytest
from knot_resolver.utils.modeling.errors import DataParsingError
-from knot_resolver.utils.modeling.parsing import parse_json, parse_yaml, try_to_parse
+from knot_resolver.utils.modeling.parsing import ParsedDataWrapper, parse_json_file, parse_yaml_file, try_to_parse_file
-json_data = """
-{
- "none": null,
- "boolean": false,
- "number": 2026,
- "string": "this is string",
- "object": {
- "number": 5000,
- "string": "this is object string"
- },
- "array": [
- "item1",
- "item2",
- "item3"
- ]
-}
-"""
+base_path = Path(__file__).parent / "test_parsing"
-json_data_duplicates = """
-{
- "duplicity-key": 1,
- "duplicity-key": 2
-}
-"""
-json_data_duplicates_inner = """
-{
- "object": {
- "duplicity-key": 1,
- "duplicity-key": 2
- }
-}
-"""
-
-yaml_data = """
-none: null
-boolean: false
-number: 2026
-string: this is string
-object:
- number: 5000
- string: this is object string
-array:
- - item1
- - item2
- - item3
-"""
-
-yaml_data_duplicates = """
-duplicity-key: 1
-duplicity-key: 2
-"""
-
-yaml_data_duplicates_inner = """
-object:
- duplicity-key: 1
- duplicity-key: 2
-"""
-
-data_dict = {
+result_dict = {
"none": None,
"boolean": False,
"number": 2026,
}
-def test_parse_json() -> None:
- data = parse_json(json_data)
- assert data == data_dict
+@pytest.mark.parametrize("file", ["data.json"])
+def test_parse_json_file(file: str) -> None:
+ file_path = base_path / file
+ wrapped_data = parse_json_file(file_path)
+ assert wrapped_data.file == file_path
+ assert wrapped_data.data == result_dict
-@pytest.mark.parametrize("data", [json_data, yaml_data])
-def test_parse_yaml(data: str) -> None:
- data = parse_yaml(data)
- assert data == data_dict
+@pytest.mark.parametrize("file", ["data.json", "data.yaml"])
+def test_parse_yaml_file(file: str) -> None:
+ file_path = base_path / file
+ wrapped_data = parse_yaml_file(file_path)
+ assert wrapped_data.file == file_path
+ assert wrapped_data.data == result_dict
-@pytest.mark.parametrize(
- "data",
- [
- json_data_duplicates,
- json_data_duplicates_inner,
- ],
-)
-def test_parse_json_duplicates(data: str) -> None:
+@pytest.mark.parametrize("file", ["duplicity.json", "duplicity.inner.json"])
+def test_parse_json_file_duplicity(file: str) -> None:
+ file_path = base_path / file
with pytest.raises(DataParsingError):
- parse_json(data)
+ parse_json_file(file_path)
@pytest.mark.parametrize(
- "data",
+ "file",
[
- json_data_duplicates,
- json_data_duplicates_inner,
- yaml_data_duplicates,
- yaml_data_duplicates_inner,
+ "duplicity.json",
+ "duplicity.inner.json",
+ "duplicity.yaml",
+ "duplicity.inner.yaml",
],
)
-def test_parse_yaml_duplicates(data: str) -> None:
+def test_parse_yaml_file_duplicity(file: str) -> None:
+ file_path = base_path / file
with pytest.raises(DataParsingError):
- parse_yaml(data)
-
-
-@pytest.mark.parametrize("data", [json_data, yaml_data])
-def test_try_to_parse(data: str) -> None:
- data = try_to_parse(data)
- assert data == data_dict
+ parse_yaml_file(file_path)
+
+
+@pytest.mark.parametrize("file", ["data.json", "data.yaml"])
+def test_try_to_parse_file(file: str) -> None:
+ file_path = base_path / file
+ wrapped_data = try_to_parse_file(file_path)
+ assert wrapped_data.file == file_path
+ assert wrapped_data.data == result_dict
+
+
+@pytest.mark.parametrize("file", ["include.root.yaml"])
+def test_try_to_parse_file_yaml_include_tag(file: str) -> None:
+ file_path = base_path / file
+ wrapped_data = try_to_parse_file(file_path)
+ assert wrapped_data.file == file_path
+ assert wrapped_data.data.file.parent == base_path
+ assert wrapped_data.data.data == result_dict
+
+
+@pytest.mark.parametrize("file", ["include.inner.yaml"])
+def test_try_to_parse_file_yaml_include_tag_inner(file: str) -> None:
+ file_path = base_path / file
+ wrapped_data = try_to_parse_file(file_path)
+ assert wrapped_data.file == file_path
+ assert wrapped_data.data["object"].data == result_dict["object"]
+
+
+@pytest.mark.parametrize("file", ["include-key.yaml"])
+def test_try_to_parse_file_yaml_include_key(file: str) -> None:
+ file_path = base_path / file
+ wrapped_data = try_to_parse_file(file_path)
+ assert wrapped_data.file == file_path
+ for key in ["none", "boolean", "number", "string"]:
+ assert wrapped_data.data[key] == result_dict[key]
+ for include in wrapped_data.data["include"]:
+ print(include.data)
+ assert isinstance(include, ParsedDataWrapper)
+ data = include.data
+ if "object" in data:
+ assert data["object"] == result_dict["object"]
+ elif "array" in data:
+ assert data["array"] == result_dict["array"]
+ else:
+ assert False