[tool.ruff.lint]
select = ["ALL"]
-ignore = [
- # * candidate for remove in the future
-
- # Annotations
- "ANN401", # Dynamically typed expressions (typing.Any) are disallowed
-
- # Arguments
- "ARG001", # Unused function argument
- "ARG002", # Unused method argument
- "ARG004", # Unused static method argument
-
- "B009", # *Replace `getattr` with attribute access
- "BLE001", # *Do not catch blind exception
- "C408", # *Unnecessary `tuple` call (rewrite as a literal)
- "C417", # *Unnecessary `map` usage (rewrite using a `list` comprehension)
- "COM812", # Trailing comma missing
-
- # docstring
- "D100", # Missing docstring in public module
- "D101", # Missing docstring in public class
- "D102", # Missing docstring in public method
- "D103", # Missing docstring in public function
- "D104", # Missing docstring in public package
- "D105", # Missing docstring in magic method (__int__, __str__, ...)
- "D106", # Missing docstring in public nested class
- "D107", # Missing docstring in `__init__`
- # docstring: lines
- "D202", # *No blank lines allowed after function docstring
- # docstring: incompatible
- "D203", # No blank lines allowed before class docstring; incompatible with D211
- "D212", # Multi-line docstring summary should start at the first line; incompatible with D213
-
- # exceptions
- "EM101", # *Exception must not use a string literal, assign to variable first
- "EM102", # *Exception must not use an f-string literal, assign to variable first
- "TRY003", # Avoid specifying long messages outside the exception class
- "TRY004", # *Prefer `TypeError` exception for invalid type
- "TRY301", # *Abstract `raise` to an inner function
- "TRY400", # *Use `logging.exception` instead of `logging.error`
-
- "ERA001", # *Found commented-out code
- "FA100", # (remove in py3.10) Add `from __future__ import annotations` to simplify typing
-
- # FIXME and TODO
- "FIX001",
- "FIX002", # Missing issue link on the line following this TODO
- "TD001",
- "TD002",
- "TD004",
-
- # boolean
- "FBT001", # Boolean-typed positional argument in function definition
- "FBT002", # Boolean default positional argument in function definition
- "FBT003", # Boolean positional value in function call
-
- # logging
- "G004", # *Logging statement uses f-string
- "G201", # *Logging `.exception(...)` should be used instead of `.error(..., exc_info=True)`
-
- "ISC001", # *Single-line implicit string concatenation
- "PERF203", # `try`-`except` within a loop incurs performance overhead
- "PLR2004", # *Magic value used in comparison
- "PLW0603", # Using the global statement to update `_value` is discouraged
-
- # path
- "PTH104", # `os.rename()` should be replaced by `Path.rename()`
- "PTH108", # `os.unlink()` should be replaced by `Path.unlink()`
- "PTH109", # `os.getcwd()` should be replaced by `Path.cwd()`
- "PTH112", # `os.path.isdir()` should be replaced by `Path.is_dir()`
- "PTH116", # `os.stat()` should be replaced by `Path.stat()`, `Path.owner()`, or `Path.group()`
- "PTH118", # `os.path.join()` should be replaced by `Path` with `/` operator
- "PTH120", # `os.path.dirname()` should be replaced by `Path.parent`
- "PTH123", # `open()` should be replaced by `Path.open()`
- "PTH201", # *Do not pass the current directory explicitly to `Path`
-
- "RSE102", # *Unnecessary parentheses on raised exception
- "RUF005", # *Consider `[sys.executable, *sys.argv]` instead of concatenation
- "RUF010", # *Use explicit conversion flag
- "RUF012", # *Mutable class attributes should be annotated with `typing.ClassVar`
- "S101", # Use of `assert` detected
- "S105", # Possible hardcoded password assigned
- "S310", # Audit URL open for permitted schemes. Allowing use of `file:` or custom schemes is often unexpected.
- "S606", # Starting a process without a shell
- "S701", # Using jinja2 templates with `autoescape=False` is dangerous and can lead to XSS.
- # Ensure `autoescape=True` or use the `select_autoescape` function.
- "SIM105", # Use `contextlib.suppress(FileNotFoundError)` instead of `try`-`except`-`pass`
- "SIM118", # *Use `key in dict` instead of `key in dict.keys()
- "T201", # `print` found
- "TD003", # Missing issue link on the line following this TODO
-
- "UP012", # Unnecessary call to `encode` as UTF-8
- "UP015", # Unnecessary open mode parameters
- "UP036", # *Version block is outdated for minimum Python version
- "UP037", # *Remove quotes from type annotation
-
- # type-checking
- "TCH001", # *Move application import into a type-checking block
- "TCH003", # *Move standard library import into a type-checking block
-]
+# ignore = [
+# # * candidate for remove in the future
+
+# # Annotations
+# "ANN401", # Dynamically typed expressions (typing.Any) are disallowed
+
+# # Arguments
+# "ARG001", # Unused function argument
+# "ARG002", # Unused method argument
+# "ARG004", # Unused static method argument
+
+# "B009", # *Replace `getattr` with attribute access
+# "BLE001", # *Do not catch blind exception
+# "C408", # *Unnecessary `tuple` call (rewrite as a literal)
+# "C417", # *Unnecessary `map` usage (rewrite using a `list` comprehension)
+# "COM812", # Trailing comma missing
+
+# # docstring
+# "D100", # Missing docstring in public module
+# "D101", # Missing docstring in public class
+# "D102", # Missing docstring in public method
+# "D103", # Missing docstring in public function
+# "D104", # Missing docstring in public package
+# "D105", # Missing docstring in magic method (__int__, __str__, ...)
+# "D106", # Missing docstring in public nested class
+# "D107", # Missing docstring in `__init__`
+# # docstring: lines
+# "D202", # *No blank lines allowed after function docstring
+# # docstring: incompatible
+# "D203", # No blank lines allowed before class docstring; incompatible with D211
+# "D212", # Multi-line docstring summary should start at the first line; incompatible with D213
+
+# # exceptions
+# "EM101", # *Exception must not use a string literal, assign to variable first
+# "EM102", # *Exception must not use an f-string literal, assign to variable first
+# "TRY003", # Avoid specifying long messages outside the exception class
+# "TRY004", # *Prefer `TypeError` exception for invalid type
+# "TRY301", # *Abstract `raise` to an inner function
+# "TRY400", # *Use `logging.exception` instead of `logging.error`
+
+# "ERA001", # *Found commented-out code
+# "FA100", # (remove in py3.10) Add `from __future__ import annotations` to simplify typing
+
+# # FIXME and TODO
+# "FIX001",
+# "FIX002", # Missing issue link on the line following this TODO
+# "TD001",
+# "TD002",
+# "TD004",
+
+# # boolean
+# "FBT001", # Boolean-typed positional argument in function definition
+# "FBT002", # Boolean default positional argument in function definition
+# "FBT003", # Boolean positional value in function call
+
+# # logging
+# "G004", # *Logging statement uses f-string
+# "G201", # *Logging `.exception(...)` should be used instead of `.error(..., exc_info=True)`
+
+# "ISC001", # *Single-line implicit string concatenation
+# "PERF203", # `try`-`except` within a loop incurs performance overhead
+# "PLR2004", # *Magic value used in comparison
+# "PLW0603", # Using the global statement to update `_value` is discouraged
+
+# # path
+# "PTH104", # `os.rename()` should be replaced by `Path.rename()`
+# "PTH108", # `os.unlink()` should be replaced by `Path.unlink()`
+# "PTH109", # `os.getcwd()` should be replaced by `Path.cwd()`
+# "PTH112", # `os.path.isdir()` should be replaced by `Path.is_dir()`
+# "PTH116", # `os.stat()` should be replaced by `Path.stat()`, `Path.owner()`, or `Path.group()`
+# "PTH118", # `os.path.join()` should be replaced by `Path` with `/` operator
+# "PTH120", # `os.path.dirname()` should be replaced by `Path.parent`
+# "PTH123", # `open()` should be replaced by `Path.open()`
+# "PTH201", # *Do not pass the current directory explicitly to `Path`
+
+# "RSE102", # *Unnecessary parentheses on raised exception
+# "RUF005", # *Consider `[sys.executable, *sys.argv]` instead of concatenation
+# "RUF010", # *Use explicit conversion flag
+# "RUF012", # *Mutable class attributes should be annotated with `typing.ClassVar`
+# "S101", # Use of `assert` detected
+# "S105", # Possible hardcoded password assigned
+# "S310", # Audit URL open for permitted schemes. Allowing use of `file:` or custom schemes is often unexpected.
+# "S606", # Starting a process without a shell
+# "S701", # Using jinja2 templates with `autoescape=False` is dangerous and can lead to XSS.
+# # Ensure `autoescape=True` or use the `select_autoescape` function.
+# "SIM105", # Use `contextlib.suppress(FileNotFoundError)` instead of `try`-`except`-`pass`
+# "SIM118", # *Use `key in dict` instead of `key in dict.keys()
+# "T201", # `print` found
+# "TD003", # Missing issue link on the line following this TODO
+
+# "UP012", # Unnecessary call to `encode` as UTF-8
+# "UP015", # Unnecessary open mode parameters
+# "UP036", # *Version block is outdated for minimum Python version
+# "UP037", # *Remove quotes from type annotation
+
+# # type-checking
+# "TCH001", # *Move application import into a type-checking block
+# "TCH003", # *Move standard library import into a type-checking block
+# ]
exclude = [
- "tests/python/knot_resolver*",
- "tests/python/knot_resolver/utils/modeling/types/path_testing*",
+ # "tests/python/knot_resolver*",
+ # "tests/python/knot_resolver/utils/modeling/types/path_testing*",
]
[tool.ruff.lint.isort]
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
+# exclude = ["tests/python/knot_resolver/utils/modeling/types/path_testing"]
[tool.mypy]
python_version = "3.8"
from __future__ import annotations
+from knot_resolver.utils.modeling import DataModel
+
from .templates import LOADER_TEMPLATE, WORKER_TEMPLATE
-class KresConfig:
+class KresConfig(DataModel):
+ """ """
+
def render_lua_worker(self) -> str:
return WORKER_TEMPLATE.render(cfg=self)
--- /dev/null
+from __future__ import annotations
+
+from typing import Literal
+
+# Policy actions
+PolicyActionEnum = Literal[
+ # Nonchain actions
+ "pass",
+ "deny",
+ "drop",
+ "refuse",
+ "tc",
+ "reroute",
+ "answer",
+ # Chain actions
+ "mirror",
+ "forward",
+ "stub",
+ "debug-always",
+ "debug-cache-miss",
+ "qtrace",
+ "reqtrace",
+]
+
+# FLAGS from https://www.knot-resolver.cz/documentation/latest/lib.html?highlight=options#c.kr_qflags
+PolicyFlagEnum = Literal[
+ "no-minimize",
+ "no-ipv4",
+ "no-ipv6",
+ "tcp",
+ "resolved",
+ "await-ipv4",
+ "await-ipv6",
+ "await-cut",
+ "no-edns",
+ "cached",
+ "no-cache",
+ "expiring",
+ "allow_local",
+ "dnssec-want",
+ "dnssec-bogus",
+ "dnssec-insecure",
+ "dnssec-cd",
+ "stub",
+ "always-cut",
+ "dnssec-wexpand",
+ "permissive",
+ "strict",
+ "badcookie-again",
+ "cname",
+ "reorder-rr",
+ "trace",
+ "no-0x20",
+ "dnssec-nods",
+ "dnssec-optout",
+ "nonauth",
+ "forward",
+ "dns64-mark",
+ "cache-tried",
+ "no-ns-found",
+ "pkt-is-sane",
+ "dns64-disable",
+]
+
+# DNS records from 'kres.type' table
+DNSRecordTypeEnum = Literal[
+ "A",
+ "A6",
+ "AAAA",
+ "AFSDB",
+ "ANY",
+ "APL",
+ "ATMA",
+ "AVC",
+ "AXFR",
+ "CAA",
+ "CDNSKEY",
+ "CDS",
+ "CERT",
+ "CNAME",
+ "CSYNC",
+ "DHCID",
+ "DLV",
+ "DNAME",
+ "DNSKEY",
+ "DOA",
+ "DS",
+ "EID",
+ "EUI48",
+ "EUI64",
+ "GID",
+ "GPOS",
+ "HINFO",
+ "HIP",
+ "HTTPS",
+ "IPSECKEY",
+ "ISDN",
+ "IXFR",
+ "KEY",
+ "KX",
+ "L32",
+ "L64",
+ "LOC",
+ "LP",
+ "MAILA",
+ "MAILB",
+ "MB",
+ "MD",
+ "MF",
+ "MG",
+ "MINFO",
+ "MR",
+ "MX",
+ "NAPTR",
+ "NID",
+ "NIMLOC",
+ "NINFO",
+ "NS",
+ "NSAP",
+ "NSAP-PTR",
+ "NSEC",
+ "NSEC3",
+ "NSEC3PARAM",
+ "NULL",
+ "NXT",
+ "OPENPGPKEY",
+ "OPT",
+ "PTR",
+ "PX",
+ "RKEY",
+ "RP",
+ "RRSIG",
+ "RT",
+ "SIG",
+ "SINK",
+ "SMIMEA",
+ "SOA",
+ "SPF",
+ "SRV",
+ "SSHFP",
+ "SVCB",
+ "TA",
+ "TALINK",
+ "TKEY",
+ "TLSA",
+ "TSIG",
+ "TXT",
+ "UID",
+ "UINFO",
+ "UNSPEC",
+ "URI",
+ "WKS",
+ "X25",
+ "ZONEMD",
+]
+from .data_model import DataModel
+from .data_store import DataStore
+
+__all__ = [
+ "DataModel",
+ "DataStore",
+]
--- /dev/null
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+from .data_mode_node import DataModelNode
+
+
+class DataMapper:
+ def _assign_field(self, obj: Any, name: str, python_type: Any, value: Any, object_path: str) -> None:
+ value = self.map_object(python_type, value, object_path=f"{object_path}/{name}")
+ setattr(obj, name, value)
+
+ def _assign_fields(self, obj: Any, source: Union[Dict[str, Any], "BaseSchema", None], object_path: str) -> Set[str]: # noqa: C901
+ """
+ Assign fields and values.
+
+ Order of assignment:
+ 1. all direct assignments
+ 2. assignments with conversion method
+ """
+ cls = obj.__class__
+ annot = get_annotations(cls)
+ errs: List[DataValidationError] = []
+
+ used_keys: Set[str] = set()
+ for name, python_type in annot.items():
+ try:
+ if is_internal_field_name(name):
+ continue
+
+ # populate field
+ if source is None:
+ self._assign_default(obj, name, python_type, object_path)
+
+ # check for invalid configuration with both transformation function and default value
+ elif hasattr(obj, f"_{name}") and hasattr(obj, name):
+ raise RuntimeError(
+ f"Field '{obj.__class__.__name__}.{name}' has default value and transformation function at"
+ " the same time. That is now allowed. Store the default in the transformation function."
+ )
+
+ # there is a transformation function to create the value
+ elif hasattr(obj, f"_{name}") and callable(getattr(obj, f"_{name}")):
+ val = self._get_converted_value(obj, name, source, object_path)
+ self._assign_field(obj, name, python_type, val, object_path)
+ used_keys.add(name)
+
+ # source just contains the value
+ elif name in source:
+ val = source[name]
+ self._assign_field(obj, name, python_type, val, object_path)
+ used_keys.add(name)
+
+ # there is a default value, or the type is optional => store the default or null
+ elif hasattr(obj, name) or is_optional(python_type):
+ self._assign_default(obj, name, python_type, object_path)
+
+ # we expected a value but it was not there
+ else:
+ errs.append(DataValidationError(f"missing attribute '{name}'.", object_path))
+ except DataValidationError as e:
+ errs.append(e)
+
+ if len(errs) == 1:
+ raise errs[0]
+ if len(errs) > 1:
+ raise AggregateDataValidationError(object_path, errs)
+ return used_keys
+
+ def object_constructor(self, node: DataModelNode, source: dict[Any, Any], tree_path: str, base_path: Path) -> None:
+ # assign fields
+ used_keys = self._assign_fields(obj, source, object_path)
+
+ # check for unused keys in the source object
+ if source and not isinstance(source, BaseSchema):
+ unused = source.keys() - used_keys
+ if len(unused) > 0:
+ keys = ", ".join(f"'{u}'" for u in unused)
+ raise DataValidationError(
+ f"unexpected extra key(s) {keys}",
+ object_path,
+ )
+
+ # validate the constructed value
+ try:
+ obj._validate() # noqa: SLF001
+ except ValueError as e:
+ raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path or "/") from e
+
+ # def _create_tuple(self, typ: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
+ # types = get_generic_type_arguments(tp)
+ # errs: List[DataValidationError] = []
+ # res: List[Any] = []
+ # for i, (t, val) in enumerate(zip(types, obj)):
+ # try:
+ # res.append(self.map_object(t, val, object_path=f"{object_path}[{i}]"))
+ # except DataValidationError as e:
+ # errs.append(e)
+ # if len(errs) == 1:
+ # raise errs[0]
+ # if len(errs) > 1:
+ # raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ # return tuple(res)
+
+ # def _create_dict(self, tp: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
+ # key_type, val_type = get_generic_type_arguments(tp)
+ # try:
+ # errs: List[DataValidationError] = []
+ # res: Dict[Any, Any] = {}
+ # for key, val in obj.items():
+ # try:
+ # nkey = self.map_object(key_type, key, object_path=f"{object_path}[{key}]")
+ # nval = self.map_object(val_type, val, object_path=f"{object_path}[{key}]")
+ # res[nkey] = nval
+ # except DataValidationError as e:
+ # errs.append(e)
+ # if len(errs) == 1:
+ # raise errs[0]
+ # if len(errs) > 1:
+ # raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ # except AttributeError as e:
+ # raise DataValidationError(
+ # f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
+ # ) from e
+ # else:
+ # return res
+
+ # def _create_list(self, tp: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
+ # if isinstance(obj, str):
+ # raise DataValidationError("expected list, got string", object_path)
+
+ # inner_type = get_generic_type_argument(tp)
+ # errs: List[DataValidationError] = []
+ # res: List[Any] = []
+
+ # try:
+ # for i, val in enumerate(obj):
+ # res.append(self.map_object(inner_type, val, object_path=f"{object_path}[{i}]"))
+ # if len(res) == 0:
+ # raise DataValidationError("empty list is not allowed", object_path)
+ # except DataValidationError as e:
+ # errs.append(e)
+ # except TypeError as e:
+ # errs.append(DataValidationError(str(e), object_path))
+
+ # if len(errs) == 1:
+ # raise errs[0]
+ # if len(errs) > 1:
+ # raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ # return res
+
+ # def _create_str(self, obj: Any, object_path: str) -> str:
+ # # we are willing to cast any primitive value to string, but no compound values are allowed
+ # if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
+ # return str(obj)
+ # if is_obj_type(obj, bool):
+ # raise DataValidationError(
+ # "Expected str, found bool. Be careful, that YAML parsers consider even"
+ # ' "no" and "yes" as a bool. Search for the Norway Problem for more'
+ # " details. And please use quotes explicitly.",
+ # object_path,
+ # )
+ # raise DataValidationError(
+ # f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
+ # )
+
+ # def _create_int(self, obj: Any, object_path: str) -> int:
+ # # we don't want to make an int out of anything else than other int
+ # # except for BaseValueType class instances
+ # if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
+ # return int(obj)
+ # raise DataValidationError(f"expected int, found {type(obj)}", object_path)
+
+ # def _create_union(self, tp: Type[T], obj: Any, object_path: str) -> T:
+ # variants = get_generic_type_arguments(tp)
+ # errs: List[DataValidationError] = []
+ # for v in variants:
+ # try:
+ # return self.map_object(v, obj, object_path=object_path)
+ # except DataValidationError as e:
+ # errs.append(e)
+
+ # raise DataValidationError("could not parse any of the possible variants", object_path, child_exceptions=errs)
+
+ # def _create_optional(self, tp: Type[Optional[T]], obj: Any, object_path: str) -> Optional[T]:
+ # inner: Type[Any] = get_optional_inner_type(tp)
+ # if obj is None:
+ # return None
+ # return self.map_object(inner, obj, object_path=object_path)
+
+ # def _create_bool(self, obj: Any, object_path: str) -> bool:
+ # if is_obj_type(obj, bool):
+ # return obj
+ # raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+
+ # def _create_literal(self, tp: Type[Any], obj: Any, object_path: str) -> Any:
+ # args = get_generic_type_arguments(tp)
+
+ # expected = []
+ # if sys.version_info < (3, 9):
+ # for arg in args:
+ # if is_literal(arg):
+ # expected += get_generic_type_arguments(arg)
+ # else:
+ # expected.append(arg)
+ # else:
+ # expected = args
+
+ # if obj in expected:
+ # return obj
+ # raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+
+ # def _create_base_schema_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseSchema":
+ # if isinstance(obj, (dict, BaseSchema)):
+ # return tp(obj, object_path=object_path)
+ # raise DataValidationError(f"expected 'dict' or 'NoRenameBaseSchema' object, found '{type(obj)}'", object_path)
+
+ # def create_value_type_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseValueType":
+ # if isinstance(obj, tp):
+ # # if we already have a custom value type, just pass it through
+ # return obj
+ # # no validation performed, the implementation does it in the constuctor
+ # try:
+ # return tp(obj, object_path=object_path)
+ # except ValueError as e:
+ # if len(e.args) > 0 and isinstance(e.args[0], str):
+ # msg = e.args[0]
+ # else:
+ # msg = f"Failed to validate value against {tp} type"
+ # raise DataValidationError(msg, object_path) from e
+
+ # def _create_default(self, obj: Any) -> Any:
+ # if isinstance(obj, _LazyDefault):
+ # return obj.instantiate()
+ # return obj
+
+ # def map_object( # noqa: C901, PLR0911, PLR0912
+ # self,
+ # tp: Type[Any],
+ # obj: Any,
+ # default: Any = ...,
+ # use_default: bool = False,
+ # object_path: str = "/",
+ # ) -> Any:
+ # """
+ # Given an expected type `cls` and a value object `obj`.
+
+ # Return a new object of the given type and map fields of `obj` into it.
+ # During the mapping procedure, runtime type checking is performed.
+ # """
+ # # Disabling these checks, because I think it's much more readable as a single function
+ # # and it's not that large at this point. If it got larger, then we should definitely split it
+ # # pylint: disable=too-many-branches,too-many-locals,too-many-statements
+
+ # # default values
+ # if obj is None and use_default:
+ # return self._create_default(default)
+
+ # # NoneType
+ # if is_none_type(tp):
+ # if obj is None:
+ # return None
+ # raise DataValidationError(f"expected None, found '{obj}'.", object_path)
+
+ # # Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
+ # if is_optional(tp):
+ # return self._create_optional(tp, obj, object_path)
+
+ # # Union[*variants]
+ # if is_union(tp):
+ # return self._create_union(tp, obj, object_path)
+
+ # # after this, there is no place for a None object
+ # if obj is None:
+ # raise DataValidationError(f"unexpected value 'None' for type {tp}", object_path)
+
+ # # int
+ # if tp is int:
+ # return self._create_int(obj, object_path)
+
+ # # str
+ # if tp is str:
+ # return self._create_str(obj, object_path)
+
+ # # bool
+ # if tp is bool:
+ # return self._create_bool(obj, object_path)
+
+ # # float
+ # if tp is float:
+ # raise NotImplementedError(
+ # "Floating point values are not supported in the object mapper."
+ # " Please implement them and be careful with type coercions"
+ # )
+
+ # # Literal[T]
+ # if is_literal(tp):
+ # return self._create_literal(tp, obj, object_path)
+
+ # # Dict[K,V]
+ # if is_dict(tp):
+ # return self._create_dict(tp, obj, object_path)
+
+ # # any Enums (probably used only internally in DataValidator)
+ # if is_enum(tp):
+ # if isinstance(obj, tp):
+ # return obj
+ # raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
+
+ # # List[T]
+ # if is_list(tp):
+ # return self._create_list(tp, obj, object_path)
+
+ # # Tuple[A,B,C,D,...]
+ # if is_tuple(tp):
+ # return self._create_tuple(tp, obj, object_path)
+
+ # # type of obj and cls type match
+ # if is_obj_type(obj, tp):
+ # return obj
+
+ # # when the specified type is Any, just return the given value
+ # # on mypy version 1.11.0 comparison-overlap error started popping up
+ # # https://github.com/python/mypy/issues/17665
+ # if tp == Any: # type: ignore[comparison-overlap]
+ # return obj
+
+ # # BaseValueType subclasses
+ # if inspect.isclass(tp) and issubclass(tp, BaseValueType):
+ # return self.create_value_type_object(tp, obj, object_path)
+
+ # # BaseGenericTypeWrapper subclasses
+ # if is_generic_type_wrapper(tp):
+ # inner_type = get_generic_type_wrapper_argument(tp)
+ # obj_valid = self.map_object(inner_type, obj, object_path)
+ # return tp(obj_valid, object_path=object_path)
+
+ # # nested BaseSchema subclasses
+ # if inspect.isclass(tp) and issubclass(tp, BaseSchema):
+ # return self._create_base_schema_object(tp, obj, object_path)
+
+ # # if the object matches, just pass it through
+ # if inspect.isclass(tp) and isinstance(obj, tp):
+ # return obj
+
+ # # default error handler
+ # raise DataValidationError(
+ # f"Type {tp} cannot be parsed. This is a implementation error. "
+ # "Please fix your types in the class or improve the parser/validator.",
+ # object_path,
+ # )
+
+ # def is_obj_type_valid(self, obj: Any, tp: Type[Any]) -> bool:
+ # """Runtime type checking. Validate, that a given object is of a given type."""
+ # try:
+ # self.map_object(tp, obj)
+ # except (DataValidationError, ValueError):
+ # return False
+ # else:
+ # return True
+
+ # def _assign_default(self, obj: Any, name: str, python_type: Any, object_path: str) -> None:
+ # cls = obj.__class__
+
+ # try:
+ # default = self._create_default(getattr(cls, name, None))
+ # except ValueError as e:
+ # raise DataValidationError(str(e), f"{object_path}/{name}") from e
+
+ # value = self.map_object(python_type, default, object_path=f"{object_path}/{name}")
+ # setattr(obj, name, value)
+
+ # def _get_converted_value(self, obj: Any, key: str, source: TSource, object_path: str) -> Any:
+ # """Get a value of a field by invoking appropriate transformation function."""
+ # try:
+ # func = getattr(obj.__class__, f"_{key}")
+ # argc = len(inspect.signature(func).parameters)
+ # if argc == 1:
+ # # it is a static method
+ # return func(source)
+ # if argc == 2:
+ # # it is a instance method
+ # return func(_create_untouchable("obj"), source)
+ # raise RuntimeError("Transformation function has wrong number of arguments")
+ # except ValueError as e:
+ # msg = e.args[0] if len(e.args) > 0 and isinstance(e.args[0], str) else "Failed to validate value type"
+ # raise DataValidationError(msg, object_path) from e
+
+ # def object_constructor(self, obj: Any, source: Union["BaseSchema", Dict[Any, Any]], object_path: str) -> None:
+ # """
+ # Construct object. Delegated constructor for the NoRenameBaseSchema class.
+
+ # The reason this method is delegated to the mapper is due to renaming. Like this, we don't have to
+ # worry about a different BaseSchema class, when we want to have dynamically renamed fields.
+ # """
+ # # As this is a delegated constructor, we must ignore protected access warnings
+
+ # # sanity check
+ # if not isinstance(source, (BaseSchema, dict)):
+ # raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
+
+ # # construct lower level schema first if configured to do so
+ # if obj._LAYER is not None: # noqa: SLF001
+ # source = obj._LAYER(source, object_path=object_path) # pylint: disable=not-callable # noqa: SLF001
+
+ # # assign fields
+ # used_keys = self._assign_fields(obj, source, object_path)
+
+ # # check for unused keys in the source object
+ # if source and not isinstance(source, BaseSchema):
+ # unused = source.keys() - used_keys
+ # if len(unused) > 0:
+ # keys = ", ".join(f"'{u}'" for u in unused)
+ # raise DataValidationError(
+ # f"unexpected extra key(s) {keys}",
+ # object_path,
+ # )
+
+ # # validate the constructed value
+ # try:
+ # obj._validate() # noqa: SLF001
+ # except ValueError as e:
+ # raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path or "/") from e
--- /dev/null
+from __future__ import annotations
+
+import enum
+import inspect
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, TypeVar
+
+from .context import Context, Strictness
+from .errors import DataDescriptionError
+from .parsing import ParsedData, ParsedDataWrapper
+from .types.base_custom_type import BaseCustomType
+from .types.inspect import (
+ get_annotations,
+ get_base_generic_type_wrapper_argument,
+ get_optional_inner_type,
+ is_base_generic_type_wrapper,
+ is_dict,
+ is_list,
+ is_literal,
+ is_none_type,
+ is_optional,
+ is_union,
+)
+
+if TYPE_CHECKING:
+ from .pointer import JSONPointer
+
+
+# def _split_docstring(docstring: str) -> tuple[str, str] | None:
+# if "---" not in docstring:
+# return ("\n".join([s.strip() for s in docstring.splitlines()]).strip(), None)
+
+# doc, attrs_doc = docstring.split("---", maxsplit=1)
+# return (
+# "\n".join([s.strip() for s in doc.splitlines()]).strip(),
+# attrs_doc,
+# )
+
+
+# def _describe_type(typ: type[Any]) -> dict[Any, Any]:
+# if is_none_type(typ):
+# return {"type": "null"}
+# if typ is int:
+# return {"type": "integer"}
+# if typ is bool:
+# return {"type": "boolean"}
+# if typ is str:
+# return {"type": "string"}
+
+# if inspect.isclass(typ) and issubclass(typ, DataModel):
+# return typ.json_schema()
+
+# if inspect.isclass(typ) and issubclass(typ, BaseCustomType):
+# return typ.json_schema()
+
+# if is_base_generic_type_wrapper(typ):
+# wrapped = get_base_generic_type_wrapper_argument(typ)
+# return _describe_type(wrapped)
+
+# if is_literal(typ):
+# lit: list[str] = []
+# args = inspect.get_args(typ)
+# for arg in args:
+# if is_literal(arg):
+# lit += inspect.get_args(arg)
+# else:
+# lit.append(arg)
+# return {"type": "string", "enum": lit}
+
+# if is_optional(typ):
+# desc = _describe_type(get_optional_inner_type(typ))
+# if "type" in desc:
+# desc["type"] = [desc["type"], "null"]
+# return desc
+# return {"anyOf": [{"type": "null"}, desc]}
+
+# if is_union(typ):
+# variants = inspect.get_args(typ)
+# return {"anyOf": [_describe_type(v) for v in variants]}
+
+# if is_list(typ):
+# return {"type": "array", "items": _describe_type(inspect.get_args(typ)[0])}
+
+# if is_dict(typ):
+# key, value = inspect.get_args(typ)
+
+# if inspect.isclass(key) and issubclass(key, BaseCustomType):
+# assert (
+# key.__str__ is not BaseCustomType.__str__
+# ), "To support derived 'BaseValueType', __str__ must be implemented."
+# else:
+# assert key is str, "We currently do not support any other keys then strings"
+
+# return {"type": "object", "additionalProperties": _describe_type(value)}
+
+# msg = f"JSON schema for type '{typ}' is not implemented"
+# raise NotImplementedError(msg)
+
+
+# def _get_attrs_docs(docstring: str) -> dict[str, str] | None:
+# attrs_docs = _split_docstring(docstring)[1]
+# if attrs_docs is None:
+# return None
+# return parse_yaml(attrs_docs)
+
+
+def _get_json_schema_description(model_type: type[DataModel]) -> str:
+ return ""
+
+
+# docstring = inspect.get_doc(node_type)
+# if not docstring:
+# msg = f"missing docstring for '{node_type}"
+# raise DataDescriptionError(msg, node_path)
+# return _split_docstring(docstring)[0]
+
+
+def _get_json_schema_properties(model_type: type[DataModel]) -> dict[Any, Any]:
+ return {}
+
+
+# schema: dict[str, Any] = {}
+
+# docstring = inspect.get_doc(model_type)
+# if not docstring:
+# msg = f"missing docstring for '{model_type}'"
+# raise DataDescriptionError(msg)
+# attrs_docs = _get_attrs_docs(docstring)
+
+# annotations: dict[str, Any] = get_annotations(model_type)
+# for attr_name, attr_type in annotations.items():
+# name = attr_name.replace("_", "-")
+# schema[name] = _describe_type(attr_type)
+
+# description = attrs_docs.pop(attr_name, None)
+# if description is None:
+# msg = f"missing description for '{attr_name}' in docstring for '{model_type}'"
+# raise DataDescriptionError(msg)
+# schema[name]["description"] = description
+
+# default = getattr(model_type, attr_name, None)
+# if default:
+# schema[name]["default"] = default
+
+# if attrs_docs:
+# msg = f"additional description in '{model_type}' docstring: {tuple(attrs_docs.keys())}"
+# raise DataDescriptionError(msg)
+
+# return schema
+
+
+# def data_combine(data1: DataModel, data2: DataModel, *data: DataModel) -> DataModel:
+# result =
+
+# for arg in args:
+# data_combine()
+
+
+def _assign_type():
+ pass
+
+
+def _assign_attributes(self: DataModel) -> None:
+ source: ParsedData = self._source
+ annot: dict[str, Any] = get_annotations(type(self))
+
+ if isinstance(source, ParsedDataWrapper):
+ base_path = source.file.parent
+ source = source.data
+
+ if isinstance(source, dict):
+
+
+ for key, value in source.items():
+ _key = key.replace("-", "_")
+
+ if _key in annot:
+ _type = annot[key]
+ setattr(self, _key, _type(source[key]))
+
+
+
+class DataModel:
+ """Represents the data model and the source data.
+
+ The source data is stored in its original form and is simply organized into the attributes of the data model.
+ For advanced validation, first you need to assign_defaults() and then call the validate() method.
+
+ Attributes:
+ source (ParsedData | None): Source data in the dictionary.
+ pointer (JSONPointer): A JSON pointer that indicates the current position in the data model subtree.
+ base_path (str | Path): The base path for files and directories relative paths.
+
+ """
+
+ def __init__(
+ self,
+ source: ParsedData | None = None,
+ pointer: JSONPointer = "/",
+ base_path: str | Path = Path(),
+ ) -> None:
+ self._source = source or {}
+ self._validated: bool = False
+ self._pointer = pointer
+ self._base_path = Path(base_path)
+ _assign_attributes(self)
+
+ # def append(self, additional_data: DataModel) -> None:
+ # def data_combine(data1: DataModel, data2: DataModel) -> None:
+ # if type(data1) is not type(data2):
+ # # error here
+ # pass
+
+ # annot: dict[str, Any] = get_annotations(type(self))
+ # for attr_name, attr_type in annot.items():
+ # name = attr_name.replace("_", "-")
+
+ # if not hasattr(self, attr_name) and hasattr(additional_data, attr_name):
+ # attr_value = getattr(additional_data, attr_name)
+ # setattr(self, attr_name, attr_value)
+
+ # if inspect.isclass(attr_type) and issubclass(attr_type, DataModel):
+ # if hasattr(data1, attr_name) and hasattr(data2, attr_name):
+ # data_combine(getattr(data1, attr_name), getattr(data2, attr_name))
+
+ # data_combine(self, additional_data)
+
+ def assign_defaults(self) -> None:
+ """Assign default values to missing attributes.
+
+ A value is assigned if attribute is missing and default value is available for it.
+ Without default values, data validation may fail even if the data is valid.
+ """
+ cls = type(self)
+ annot: dict[str, Any] = get_annotations(cls)
+
+ for key in annot:
+ if not hasattr(key, key) and hasattr(cls, key):
+ default = getattr(cls, key)
+ setattr(self, key, default)
+
+ def _validate_subtree(self, context: Context) -> None:
+ for attr_value in vars(self).values():
+ if isinstance(attr_value, BaseCustomType):
+ attr_value.validate(context)
+
+ def _validate(self, context: Context) -> None:
+ """Validate all data under the node subtree.
+
+ All validation should be done here.
+ This method is automatically called during validation.
+ """
+
+ def validate(self, context: Context | None = None) -> None:
+ """Validate all data under the node subtree.
+
+ At least BASIC validation strictness is required
+ for the data value to be considered valid.
+ If validation is successful, no error is raised.
+
+ Args:
+ context (Context | None):
+ Optional, validation context for the validation operations, e.g. validation strictness
+ or username and groupname to check permissions on paths.
+ If set to None, Context with STRICT validation strictness is used.
+
+ Raises:
+ DataTypeError: When input data type validation fails.
+ DataValueError: When input data value validation fails.
+ DataValidationError: When some other input data validation fails.
+
+ """
+ if context is None:
+ # STRICT validation is used by default
+ context = Context(strictness=Strictness.STRICT)
+
+ self._validate_subtree(context)
+ self._validate(context)
+ if context.strictness > Strictness.NORMAL:
+ self._validated = True
+
+ @classmethod
+ def json_schema_minimal(cls) -> dict[str, Any]:
+ """Get minimal JSON schema without any additional metadata.
+
+ Returns:
+ Minimal JSON schema definition of the entire data model subtree without any additional metadata.
+
+ """
+ return {
+ "type": "object",
+ "description": _get_json_schema_description(cls),
+ "properties": _get_json_schema_properties(cls),
+ }
+
+ @classmethod
+ def json_schema(cls, schema_id: str, schema_title: str, schema_description: str) -> dict[str, Any]:
+ """Get JSON schema with metadata.
+
+ Args:
+ schema_id (str): Unique URI for the JSON schema.
+ schema_title (str): Title for the JSON schema.
+ schema_description (str): Description for the JSON schema.
+
+ Returns:
+ Proper JSON schema definition of the entire data model subtree
+ with additional metadata ($schema, $id, title and description).
+
+ """
+ return {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": schema_id,
+ "title": schema_title,
+ "type": "object",
+ "description": schema_description,
+ "properties": _get_json_schema_properties(cls),
+ }
--- /dev/null
+from __future__ import annotations
+
+from asyncio import Lock
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from .parsing import try_to_parse_file
+
+if TYPE_CHECKING:
+ from .data_model import DataModel
+
+
+class DataStore:
+ """Used to load, store and manage a data.
+
+ Attributes:
+ files (list[str | Path]):
+ A list of configuration files from which the configuration should be loaded.
+
+ """
+
+ def __init__(self, files: list[str | Path], model: type[DataModel]) -> None:
+ self._model = model
+ self._files = files
+
+ # self._verifier: list[Any] = []
+ # self._callbacks: list[Any] = []
+ self._lock: Lock = Lock()
+
+ def load_files(self) -> None:
+ data: DataModel = self._model()
+ for file in self._files:
+ file_path = Path(file)
+ file_parsed_data = try_to_parse_file(file_path)
+
+ base_path = file_path.parent
+ file_data: DataModel = self._model(file_parsed_data, base_path=base_path)
+ # combined_data.append(file_data)
+
+ raise ValueError
+ # config.validate()
self.file = Path(file)
-ParsedData = Union[Dict[str, "ParsedData"], List["ParsedData"], ParsedDataWrapper, str, int, float, bool, None]
+ParsedDataBase = Union[List["ParsedDataBase"], ParsedDataWrapper, str, int, float, bool, None]
+
+ParsedData = Union[Dict[str, "ParsedDataBase"], ParsedDataWrapper]
def _yaml_include_constructor(self: _YAMLRaiseDuplicatesIncludeLoader, node: MappingNode) -> ParsedDataWrapper:
--- /dev/null
+from __future__ import annotations
+
+JSONPointer = str
+
+
+# /path/to/config.yaml::cache/size:
+# /api::cache:size:
--- /dev/null
+{
+ "test": "this is test"
+}
--- /dev/null
+test: this not is test
--- /dev/null
+import pytest
+
+# from typing import Any
+
+from knot_resolver.utils.modeling.data_model import DataModel
+# from knot_resolver.utils.modeling.errors import DataDescriptionError
+from knot_resolver.utils.modeling.parsing import try_to_parse_file
+
+
+
+class TestModel(DataModel):
+ test: str = "default value"
+
+
+
+
+
+
+# class NoDescription(DataModelNode):
+# pass
+
+
+# class SingleLineDescription(DataModelNode):
+# """
+# Single line description.
+# """
+
+
+# class MultiLineDescription(DataModelNode):
+# """
+# Multi line
+# description.
+# """
+
+
+# class AttributesDescription(DataModelNode):
+# """
+# Data model node description.
+# ---
+# integer_attr: Description for the integer attribute.
+# string_attr: Description for the string attribute.
+# """
+
+# integer: int
+# string: str
+
+
+@pytest.mark.parametrize("model", [TestModel])
+def test_data_model(model: DataModel):
+ parsed_data = try_to_parse_file("/home/amrazek/src/knot-resolver/tests/python/knot_resolver/utils/modeling/config.test.yaml")
+ modeled = TestModel(parsed_data)
+
+ assert modeled.test == "this is test"
+
+
+# @pytest.mark.parametrize("model", [AttributesDescription])
+# def test_json_schema(model: Any):
+# schema = model.json_schema()
+
+
+# @pytest.mark.parametrize("model", [NoDescription, SingleLineDescription, MultiLineDescription])
+# def test_json_schema_invalid(model: Any):
+# with pytest.raises(DataDescriptionError):
+# model.json_schema()
+
+
+# # class FieldsDescription(ConfigSchema):
+# # """
+# # This is an awesome test class
+# # ---
+# # field: This field does nothing interesting
+# # value: Neither does this
+# # """
+
+# # field: str
+# # value: int
+
+# # schema = FieldsDescription.json_schema()
+# # assert schema["description"] == "This is an awesome test class"
+# # assert schema["properties"]["field"]["description"] == "This field does nothing interesting"
+# # assert schema["properties"]["value"]["description"] == "Neither does this"
+
+# # class NoDescription(ConfigSchema):
+# # nothing: str
+
+# # _ = NoDescription.json_schema()
+
+
+# # def test_docstring_parsing_invalid():
+# # class AdditionalItem(ConfigSchema):
+# # """
+# # This class is wrong
+# # ---
+# # field: nope
+# # nothing: really nothing
+# # """
+
+# # nothing: str
+
+# # with raises(DataDescriptionError):
+# # _ = AdditionalItem.json_schema()
+
+# # class WrongDescription(ConfigSchema):
+# # """
+# # This class is wrong
+# # ---
+# # other: description
+# # """
+
+# # nothing: str
+
+# # with raises(DataDescriptionError):
+# # _ = WrongDescription.json_schema()
--- /dev/null
+from pathlib import Path
+
+import pytest
+
+from knot_resolver.utils.modeling import DataModel, DataStore
+
+base_path = Path(__file__).parent
+
+
+class TestModel(DataModel):
+ pass
+
+
+# def test_data_store_load_files() -> None:
+# files = [
+# base_path / "config.test.yaml",
+# base_path / "config.test.json",
+# ]
+# data_store = DataStore(files, TestModel)
+# data_store.load_files()