From: Aleš Mrázek Date: Mon, 4 May 2026 14:23:05 +0000 (+0200) Subject: wip X-Git-Url: http://git.ipfire.org/gitweb/index.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fpython-refactoring-modeling;p=thirdparty%2Fknot-resolver.git wip --- diff --git a/pyproject.toml b/pyproject.toml index 995d8b8f5..e9cdcdf97 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,108 +96,108 @@ exclude = ["setup.py"] [tool.ruff.lint] select = ["ALL"] -ignore = [ - # * candidate for remove in the future - - # Annotations - "ANN401", # Dynamically typed expressions (typing.Any) are disallowed - - # Arguments - "ARG001", # Unused function argument - "ARG002", # Unused method argument - "ARG004", # Unused static method argument - - "B009", # *Replace `getattr` with attribute access - "BLE001", # *Do not catch blind exception - "C408", # *Unnecessary `tuple` call (rewrite as a literal) - "C417", # *Unnecessary `map` usage (rewrite using a `list` comprehension) - "COM812", # Trailing comma missing - - # docstring - "D100", # Missing docstring in public module - "D101", # Missing docstring in public class - "D102", # Missing docstring in public method - "D103", # Missing docstring in public function - "D104", # Missing docstring in public package - "D105", # Missing docstring in magic method (__int__, __str__, ...) - "D106", # Missing docstring in public nested class - "D107", # Missing docstring in `__init__` - # docstring: lines - "D202", # *No blank lines allowed after function docstring - # docstring: incompatible - "D203", # No blank lines allowed before class docstring; incompatible with D211 - "D212", # Multi-line docstring summary should start at the first line; incompatible with D213 - - # exceptions - "EM101", # *Exception must not use a string literal, assign to variable first - "EM102", # *Exception must not use an f-string literal, assign to variable first - "TRY003", # Avoid specifying long messages outside the exception class - "TRY004", # *Prefer `TypeError` exception for invalid type - "TRY301", # *Abstract `raise` to an inner function - "TRY400", # *Use `logging.exception` instead of `logging.error` - - "ERA001", # *Found commented-out code - "FA100", # (remove in py3.10) Add `from __future__ import annotations` to simplify typing - - # FIXME and TODO - "FIX001", - "FIX002", # Missing issue link on the line following this TODO - "TD001", - "TD002", - "TD004", - - # boolean - "FBT001", # Boolean-typed positional argument in function definition - "FBT002", # Boolean default positional argument in function definition - "FBT003", # Boolean positional value in function call - - # logging - "G004", # *Logging statement uses f-string - "G201", # *Logging `.exception(...)` should be used instead of `.error(..., exc_info=True)` - - "ISC001", # *Single-line implicit string concatenation - "PERF203", # `try`-`except` within a loop incurs performance overhead - "PLR2004", # *Magic value used in comparison - "PLW0603", # Using the global statement to update `_value` is discouraged - - # path - "PTH104", # `os.rename()` should be replaced by `Path.rename()` - "PTH108", # `os.unlink()` should be replaced by `Path.unlink()` - "PTH109", # `os.getcwd()` should be replaced by `Path.cwd()` - "PTH112", # `os.path.isdir()` should be replaced by `Path.is_dir()` - "PTH116", # `os.stat()` should be replaced by `Path.stat()`, `Path.owner()`, or `Path.group()` - "PTH118", # `os.path.join()` should be replaced by `Path` with `/` operator - "PTH120", # `os.path.dirname()` should be replaced by `Path.parent` - "PTH123", # `open()` should be replaced by `Path.open()` - "PTH201", # *Do not pass the current directory explicitly to `Path` - - "RSE102", # *Unnecessary parentheses on raised exception - "RUF005", # *Consider `[sys.executable, *sys.argv]` instead of concatenation - "RUF010", # *Use explicit conversion flag - "RUF012", # *Mutable class attributes should be annotated with `typing.ClassVar` - "S101", # Use of `assert` detected - "S105", # Possible hardcoded password assigned - "S310", # Audit URL open for permitted schemes. Allowing use of `file:` or custom schemes is often unexpected. - "S606", # Starting a process without a shell - "S701", # Using jinja2 templates with `autoescape=False` is dangerous and can lead to XSS. - # Ensure `autoescape=True` or use the `select_autoescape` function. - "SIM105", # Use `contextlib.suppress(FileNotFoundError)` instead of `try`-`except`-`pass` - "SIM118", # *Use `key in dict` instead of `key in dict.keys() - "T201", # `print` found - "TD003", # Missing issue link on the line following this TODO - - "UP012", # Unnecessary call to `encode` as UTF-8 - "UP015", # Unnecessary open mode parameters - "UP036", # *Version block is outdated for minimum Python version - "UP037", # *Remove quotes from type annotation - - # type-checking - "TCH001", # *Move application import into a type-checking block - "TCH003", # *Move standard library import into a type-checking block -] +# ignore = [ +# # * candidate for remove in the future + +# # Annotations +# "ANN401", # Dynamically typed expressions (typing.Any) are disallowed + +# # Arguments +# "ARG001", # Unused function argument +# "ARG002", # Unused method argument +# "ARG004", # Unused static method argument + +# "B009", # *Replace `getattr` with attribute access +# "BLE001", # *Do not catch blind exception +# "C408", # *Unnecessary `tuple` call (rewrite as a literal) +# "C417", # *Unnecessary `map` usage (rewrite using a `list` comprehension) +# "COM812", # Trailing comma missing + +# # docstring +# "D100", # Missing docstring in public module +# "D101", # Missing docstring in public class +# "D102", # Missing docstring in public method +# "D103", # Missing docstring in public function +# "D104", # Missing docstring in public package +# "D105", # Missing docstring in magic method (__int__, __str__, ...) +# "D106", # Missing docstring in public nested class +# "D107", # Missing docstring in `__init__` +# # docstring: lines +# "D202", # *No blank lines allowed after function docstring +# # docstring: incompatible +# "D203", # No blank lines allowed before class docstring; incompatible with D211 +# "D212", # Multi-line docstring summary should start at the first line; incompatible with D213 + +# # exceptions +# "EM101", # *Exception must not use a string literal, assign to variable first +# "EM102", # *Exception must not use an f-string literal, assign to variable first +# "TRY003", # Avoid specifying long messages outside the exception class +# "TRY004", # *Prefer `TypeError` exception for invalid type +# "TRY301", # *Abstract `raise` to an inner function +# "TRY400", # *Use `logging.exception` instead of `logging.error` + +# "ERA001", # *Found commented-out code +# "FA100", # (remove in py3.10) Add `from __future__ import annotations` to simplify typing + +# # FIXME and TODO +# "FIX001", +# "FIX002", # Missing issue link on the line following this TODO +# "TD001", +# "TD002", +# "TD004", + +# # boolean +# "FBT001", # Boolean-typed positional argument in function definition +# "FBT002", # Boolean default positional argument in function definition +# "FBT003", # Boolean positional value in function call + +# # logging +# "G004", # *Logging statement uses f-string +# "G201", # *Logging `.exception(...)` should be used instead of `.error(..., exc_info=True)` + +# "ISC001", # *Single-line implicit string concatenation +# "PERF203", # `try`-`except` within a loop incurs performance overhead +# "PLR2004", # *Magic value used in comparison +# "PLW0603", # Using the global statement to update `_value` is discouraged + +# # path +# "PTH104", # `os.rename()` should be replaced by `Path.rename()` +# "PTH108", # `os.unlink()` should be replaced by `Path.unlink()` +# "PTH109", # `os.getcwd()` should be replaced by `Path.cwd()` +# "PTH112", # `os.path.isdir()` should be replaced by `Path.is_dir()` +# "PTH116", # `os.stat()` should be replaced by `Path.stat()`, `Path.owner()`, or `Path.group()` +# "PTH118", # `os.path.join()` should be replaced by `Path` with `/` operator +# "PTH120", # `os.path.dirname()` should be replaced by `Path.parent` +# "PTH123", # `open()` should be replaced by `Path.open()` +# "PTH201", # *Do not pass the current directory explicitly to `Path` + +# "RSE102", # *Unnecessary parentheses on raised exception +# "RUF005", # *Consider `[sys.executable, *sys.argv]` instead of concatenation +# "RUF010", # *Use explicit conversion flag +# "RUF012", # *Mutable class attributes should be annotated with `typing.ClassVar` +# "S101", # Use of `assert` detected +# "S105", # Possible hardcoded password assigned +# "S310", # Audit URL open for permitted schemes. Allowing use of `file:` or custom schemes is often unexpected. +# "S606", # Starting a process without a shell +# "S701", # Using jinja2 templates with `autoescape=False` is dangerous and can lead to XSS. +# # Ensure `autoescape=True` or use the `select_autoescape` function. +# "SIM105", # Use `contextlib.suppress(FileNotFoundError)` instead of `try`-`except`-`pass` +# "SIM118", # *Use `key in dict` instead of `key in dict.keys() +# "T201", # `print` found +# "TD003", # Missing issue link on the line following this TODO + +# "UP012", # Unnecessary call to `encode` as UTF-8 +# "UP015", # Unnecessary open mode parameters +# "UP036", # *Version block is outdated for minimum Python version +# "UP037", # *Remove quotes from type annotation + +# # type-checking +# "TCH001", # *Move application import into a type-checking block +# "TCH003", # *Move standard library import into a type-checking block +# ] exclude = [ - "tests/python/knot_resolver*", - "tests/python/knot_resolver/utils/modeling/types/path_testing*", + # "tests/python/knot_resolver*", + # "tests/python/knot_resolver/utils/modeling/types/path_testing*", ] [tool.ruff.lint.isort] @@ -208,6 +208,7 @@ quote-style = "double" indent-style = "space" skip-magic-trailing-comma = false line-ending = "auto" +# exclude = ["tests/python/knot_resolver/utils/modeling/types/path_testing"] [tool.mypy] python_version = "3.8" diff --git a/python/knot_resolver/config/config.py b/python/knot_resolver/config/config.py index cf75c694c..92b0b0cbf 100644 --- a/python/knot_resolver/config/config.py +++ b/python/knot_resolver/config/config.py @@ -1,9 +1,13 @@ from __future__ import annotations +from knot_resolver.utils.modeling import DataModel + from .templates import LOADER_TEMPLATE, WORKER_TEMPLATE -class KresConfig: +class KresConfig(DataModel): + """ """ + def render_lua_worker(self) -> str: return WORKER_TEMPLATE.render(cfg=self) diff --git a/python/knot_resolver/config/types/literals.py b/python/knot_resolver/config/types/literals.py new file mode 100644 index 000000000..f0047c76b --- /dev/null +++ b/python/knot_resolver/config/types/literals.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +from typing import Literal + +# Policy actions +PolicyActionEnum = Literal[ + # Nonchain actions + "pass", + "deny", + "drop", + "refuse", + "tc", + "reroute", + "answer", + # Chain actions + "mirror", + "forward", + "stub", + "debug-always", + "debug-cache-miss", + "qtrace", + "reqtrace", +] + +# FLAGS from https://www.knot-resolver.cz/documentation/latest/lib.html?highlight=options#c.kr_qflags +PolicyFlagEnum = Literal[ + "no-minimize", + "no-ipv4", + "no-ipv6", + "tcp", + "resolved", + "await-ipv4", + "await-ipv6", + "await-cut", + "no-edns", + "cached", + "no-cache", + "expiring", + "allow_local", + "dnssec-want", + "dnssec-bogus", + "dnssec-insecure", + "dnssec-cd", + "stub", + "always-cut", + "dnssec-wexpand", + "permissive", + "strict", + "badcookie-again", + "cname", + "reorder-rr", + "trace", + "no-0x20", + "dnssec-nods", + "dnssec-optout", + "nonauth", + "forward", + "dns64-mark", + "cache-tried", + "no-ns-found", + "pkt-is-sane", + "dns64-disable", +] + +# DNS records from 'kres.type' table +DNSRecordTypeEnum = Literal[ + "A", + "A6", + "AAAA", + "AFSDB", + "ANY", + "APL", + "ATMA", + "AVC", + "AXFR", + "CAA", + "CDNSKEY", + "CDS", + "CERT", + "CNAME", + "CSYNC", + "DHCID", + "DLV", + "DNAME", + "DNSKEY", + "DOA", + "DS", + "EID", + "EUI48", + "EUI64", + "GID", + "GPOS", + "HINFO", + "HIP", + "HTTPS", + "IPSECKEY", + "ISDN", + "IXFR", + "KEY", + "KX", + "L32", + "L64", + "LOC", + "LP", + "MAILA", + "MAILB", + "MB", + "MD", + "MF", + "MG", + "MINFO", + "MR", + "MX", + "NAPTR", + "NID", + "NIMLOC", + "NINFO", + "NS", + "NSAP", + "NSAP-PTR", + "NSEC", + "NSEC3", + "NSEC3PARAM", + "NULL", + "NXT", + "OPENPGPKEY", + "OPT", + "PTR", + "PX", + "RKEY", + "RP", + "RRSIG", + "RT", + "SIG", + "SINK", + "SMIMEA", + "SOA", + "SPF", + "SRV", + "SSHFP", + "SVCB", + "TA", + "TALINK", + "TKEY", + "TLSA", + "TSIG", + "TXT", + "UID", + "UINFO", + "UNSPEC", + "URI", + "WKS", + "X25", + "ZONEMD", +] diff --git a/python/knot_resolver/utils/modeling/__init__.py b/python/knot_resolver/utils/modeling/__init__.py index e69de29bb..539b1b9b1 100644 --- a/python/knot_resolver/utils/modeling/__init__.py +++ b/python/knot_resolver/utils/modeling/__init__.py @@ -0,0 +1,7 @@ +from .data_model import DataModel +from .data_store import DataStore + +__all__ = [ + "DataModel", + "DataStore", +] diff --git a/python/knot_resolver/utils/modeling/data_mapper.py b/python/knot_resolver/utils/modeling/data_mapper.py new file mode 100644 index 000000000..814a5c123 --- /dev/null +++ b/python/knot_resolver/utils/modeling/data_mapper.py @@ -0,0 +1,424 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from .data_mode_node import DataModelNode + + +class DataMapper: + def _assign_field(self, obj: Any, name: str, python_type: Any, value: Any, object_path: str) -> None: + value = self.map_object(python_type, value, object_path=f"{object_path}/{name}") + setattr(obj, name, value) + + def _assign_fields(self, obj: Any, source: Union[Dict[str, Any], "BaseSchema", None], object_path: str) -> Set[str]: # noqa: C901 + """ + Assign fields and values. + + Order of assignment: + 1. all direct assignments + 2. assignments with conversion method + """ + cls = obj.__class__ + annot = get_annotations(cls) + errs: List[DataValidationError] = [] + + used_keys: Set[str] = set() + for name, python_type in annot.items(): + try: + if is_internal_field_name(name): + continue + + # populate field + if source is None: + self._assign_default(obj, name, python_type, object_path) + + # check for invalid configuration with both transformation function and default value + elif hasattr(obj, f"_{name}") and hasattr(obj, name): + raise RuntimeError( + f"Field '{obj.__class__.__name__}.{name}' has default value and transformation function at" + " the same time. That is now allowed. Store the default in the transformation function." + ) + + # there is a transformation function to create the value + elif hasattr(obj, f"_{name}") and callable(getattr(obj, f"_{name}")): + val = self._get_converted_value(obj, name, source, object_path) + self._assign_field(obj, name, python_type, val, object_path) + used_keys.add(name) + + # source just contains the value + elif name in source: + val = source[name] + self._assign_field(obj, name, python_type, val, object_path) + used_keys.add(name) + + # there is a default value, or the type is optional => store the default or null + elif hasattr(obj, name) or is_optional(python_type): + self._assign_default(obj, name, python_type, object_path) + + # we expected a value but it was not there + else: + errs.append(DataValidationError(f"missing attribute '{name}'.", object_path)) + except DataValidationError as e: + errs.append(e) + + if len(errs) == 1: + raise errs[0] + if len(errs) > 1: + raise AggregateDataValidationError(object_path, errs) + return used_keys + + def object_constructor(self, node: DataModelNode, source: dict[Any, Any], tree_path: str, base_path: Path) -> None: + # assign fields + used_keys = self._assign_fields(obj, source, object_path) + + # check for unused keys in the source object + if source and not isinstance(source, BaseSchema): + unused = source.keys() - used_keys + if len(unused) > 0: + keys = ", ".join(f"'{u}'" for u in unused) + raise DataValidationError( + f"unexpected extra key(s) {keys}", + object_path, + ) + + # validate the constructed value + try: + obj._validate() # noqa: SLF001 + except ValueError as e: + raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path or "/") from e + + # def _create_tuple(self, typ: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]: + # types = get_generic_type_arguments(tp) + # errs: List[DataValidationError] = [] + # res: List[Any] = [] + # for i, (t, val) in enumerate(zip(types, obj)): + # try: + # res.append(self.map_object(t, val, object_path=f"{object_path}[{i}]")) + # except DataValidationError as e: + # errs.append(e) + # if len(errs) == 1: + # raise errs[0] + # if len(errs) > 1: + # raise AggregateDataValidationError(object_path, child_exceptions=errs) + # return tuple(res) + + # def _create_dict(self, tp: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]: + # key_type, val_type = get_generic_type_arguments(tp) + # try: + # errs: List[DataValidationError] = [] + # res: Dict[Any, Any] = {} + # for key, val in obj.items(): + # try: + # nkey = self.map_object(key_type, key, object_path=f"{object_path}[{key}]") + # nval = self.map_object(val_type, val, object_path=f"{object_path}[{key}]") + # res[nkey] = nval + # except DataValidationError as e: + # errs.append(e) + # if len(errs) == 1: + # raise errs[0] + # if len(errs) > 1: + # raise AggregateDataValidationError(object_path, child_exceptions=errs) + # except AttributeError as e: + # raise DataValidationError( + # f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path + # ) from e + # else: + # return res + + # def _create_list(self, tp: Type[Any], obj: List[Any], object_path: str) -> List[Any]: + # if isinstance(obj, str): + # raise DataValidationError("expected list, got string", object_path) + + # inner_type = get_generic_type_argument(tp) + # errs: List[DataValidationError] = [] + # res: List[Any] = [] + + # try: + # for i, val in enumerate(obj): + # res.append(self.map_object(inner_type, val, object_path=f"{object_path}[{i}]")) + # if len(res) == 0: + # raise DataValidationError("empty list is not allowed", object_path) + # except DataValidationError as e: + # errs.append(e) + # except TypeError as e: + # errs.append(DataValidationError(str(e), object_path)) + + # if len(errs) == 1: + # raise errs[0] + # if len(errs) > 1: + # raise AggregateDataValidationError(object_path, child_exceptions=errs) + # return res + + # def _create_str(self, obj: Any, object_path: str) -> str: + # # we are willing to cast any primitive value to string, but no compound values are allowed + # if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType): + # return str(obj) + # if is_obj_type(obj, bool): + # raise DataValidationError( + # "Expected str, found bool. Be careful, that YAML parsers consider even" + # ' "no" and "yes" as a bool. Search for the Norway Problem for more' + # " details. And please use quotes explicitly.", + # object_path, + # ) + # raise DataValidationError( + # f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path + # ) + + # def _create_int(self, obj: Any, object_path: str) -> int: + # # we don't want to make an int out of anything else than other int + # # except for BaseValueType class instances + # if is_obj_type(obj, int) or isinstance(obj, BaseValueType): + # return int(obj) + # raise DataValidationError(f"expected int, found {type(obj)}", object_path) + + # def _create_union(self, tp: Type[T], obj: Any, object_path: str) -> T: + # variants = get_generic_type_arguments(tp) + # errs: List[DataValidationError] = [] + # for v in variants: + # try: + # return self.map_object(v, obj, object_path=object_path) + # except DataValidationError as e: + # errs.append(e) + + # raise DataValidationError("could not parse any of the possible variants", object_path, child_exceptions=errs) + + # def _create_optional(self, tp: Type[Optional[T]], obj: Any, object_path: str) -> Optional[T]: + # inner: Type[Any] = get_optional_inner_type(tp) + # if obj is None: + # return None + # return self.map_object(inner, obj, object_path=object_path) + + # def _create_bool(self, obj: Any, object_path: str) -> bool: + # if is_obj_type(obj, bool): + # return obj + # raise DataValidationError(f"expected bool, found {type(obj)}", object_path) + + # def _create_literal(self, tp: Type[Any], obj: Any, object_path: str) -> Any: + # args = get_generic_type_arguments(tp) + + # expected = [] + # if sys.version_info < (3, 9): + # for arg in args: + # if is_literal(arg): + # expected += get_generic_type_arguments(arg) + # else: + # expected.append(arg) + # else: + # expected = args + + # if obj in expected: + # return obj + # raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path) + + # def _create_base_schema_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseSchema": + # if isinstance(obj, (dict, BaseSchema)): + # return tp(obj, object_path=object_path) + # raise DataValidationError(f"expected 'dict' or 'NoRenameBaseSchema' object, found '{type(obj)}'", object_path) + + # def create_value_type_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseValueType": + # if isinstance(obj, tp): + # # if we already have a custom value type, just pass it through + # return obj + # # no validation performed, the implementation does it in the constuctor + # try: + # return tp(obj, object_path=object_path) + # except ValueError as e: + # if len(e.args) > 0 and isinstance(e.args[0], str): + # msg = e.args[0] + # else: + # msg = f"Failed to validate value against {tp} type" + # raise DataValidationError(msg, object_path) from e + + # def _create_default(self, obj: Any) -> Any: + # if isinstance(obj, _LazyDefault): + # return obj.instantiate() + # return obj + + # def map_object( # noqa: C901, PLR0911, PLR0912 + # self, + # tp: Type[Any], + # obj: Any, + # default: Any = ..., + # use_default: bool = False, + # object_path: str = "/", + # ) -> Any: + # """ + # Given an expected type `cls` and a value object `obj`. + + # Return a new object of the given type and map fields of `obj` into it. + # During the mapping procedure, runtime type checking is performed. + # """ + # # Disabling these checks, because I think it's much more readable as a single function + # # and it's not that large at this point. If it got larger, then we should definitely split it + # # pylint: disable=too-many-branches,too-many-locals,too-many-statements + + # # default values + # if obj is None and use_default: + # return self._create_default(default) + + # # NoneType + # if is_none_type(tp): + # if obj is None: + # return None + # raise DataValidationError(f"expected None, found '{obj}'.", object_path) + + # # Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting) + # if is_optional(tp): + # return self._create_optional(tp, obj, object_path) + + # # Union[*variants] + # if is_union(tp): + # return self._create_union(tp, obj, object_path) + + # # after this, there is no place for a None object + # if obj is None: + # raise DataValidationError(f"unexpected value 'None' for type {tp}", object_path) + + # # int + # if tp is int: + # return self._create_int(obj, object_path) + + # # str + # if tp is str: + # return self._create_str(obj, object_path) + + # # bool + # if tp is bool: + # return self._create_bool(obj, object_path) + + # # float + # if tp is float: + # raise NotImplementedError( + # "Floating point values are not supported in the object mapper." + # " Please implement them and be careful with type coercions" + # ) + + # # Literal[T] + # if is_literal(tp): + # return self._create_literal(tp, obj, object_path) + + # # Dict[K,V] + # if is_dict(tp): + # return self._create_dict(tp, obj, object_path) + + # # any Enums (probably used only internally in DataValidator) + # if is_enum(tp): + # if isinstance(obj, tp): + # return obj + # raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path) + + # # List[T] + # if is_list(tp): + # return self._create_list(tp, obj, object_path) + + # # Tuple[A,B,C,D,...] + # if is_tuple(tp): + # return self._create_tuple(tp, obj, object_path) + + # # type of obj and cls type match + # if is_obj_type(obj, tp): + # return obj + + # # when the specified type is Any, just return the given value + # # on mypy version 1.11.0 comparison-overlap error started popping up + # # https://github.com/python/mypy/issues/17665 + # if tp == Any: # type: ignore[comparison-overlap] + # return obj + + # # BaseValueType subclasses + # if inspect.isclass(tp) and issubclass(tp, BaseValueType): + # return self.create_value_type_object(tp, obj, object_path) + + # # BaseGenericTypeWrapper subclasses + # if is_generic_type_wrapper(tp): + # inner_type = get_generic_type_wrapper_argument(tp) + # obj_valid = self.map_object(inner_type, obj, object_path) + # return tp(obj_valid, object_path=object_path) + + # # nested BaseSchema subclasses + # if inspect.isclass(tp) and issubclass(tp, BaseSchema): + # return self._create_base_schema_object(tp, obj, object_path) + + # # if the object matches, just pass it through + # if inspect.isclass(tp) and isinstance(obj, tp): + # return obj + + # # default error handler + # raise DataValidationError( + # f"Type {tp} cannot be parsed. This is a implementation error. " + # "Please fix your types in the class or improve the parser/validator.", + # object_path, + # ) + + # def is_obj_type_valid(self, obj: Any, tp: Type[Any]) -> bool: + # """Runtime type checking. Validate, that a given object is of a given type.""" + # try: + # self.map_object(tp, obj) + # except (DataValidationError, ValueError): + # return False + # else: + # return True + + # def _assign_default(self, obj: Any, name: str, python_type: Any, object_path: str) -> None: + # cls = obj.__class__ + + # try: + # default = self._create_default(getattr(cls, name, None)) + # except ValueError as e: + # raise DataValidationError(str(e), f"{object_path}/{name}") from e + + # value = self.map_object(python_type, default, object_path=f"{object_path}/{name}") + # setattr(obj, name, value) + + # def _get_converted_value(self, obj: Any, key: str, source: TSource, object_path: str) -> Any: + # """Get a value of a field by invoking appropriate transformation function.""" + # try: + # func = getattr(obj.__class__, f"_{key}") + # argc = len(inspect.signature(func).parameters) + # if argc == 1: + # # it is a static method + # return func(source) + # if argc == 2: + # # it is a instance method + # return func(_create_untouchable("obj"), source) + # raise RuntimeError("Transformation function has wrong number of arguments") + # except ValueError as e: + # msg = e.args[0] if len(e.args) > 0 and isinstance(e.args[0], str) else "Failed to validate value type" + # raise DataValidationError(msg, object_path) from e + + # def object_constructor(self, obj: Any, source: Union["BaseSchema", Dict[Any, Any]], object_path: str) -> None: + # """ + # Construct object. Delegated constructor for the NoRenameBaseSchema class. + + # The reason this method is delegated to the mapper is due to renaming. Like this, we don't have to + # worry about a different BaseSchema class, when we want to have dynamically renamed fields. + # """ + # # As this is a delegated constructor, we must ignore protected access warnings + + # # sanity check + # if not isinstance(source, (BaseSchema, dict)): + # raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path) + + # # construct lower level schema first if configured to do so + # if obj._LAYER is not None: # noqa: SLF001 + # source = obj._LAYER(source, object_path=object_path) # pylint: disable=not-callable # noqa: SLF001 + + # # assign fields + # used_keys = self._assign_fields(obj, source, object_path) + + # # check for unused keys in the source object + # if source and not isinstance(source, BaseSchema): + # unused = source.keys() - used_keys + # if len(unused) > 0: + # keys = ", ".join(f"'{u}'" for u in unused) + # raise DataValidationError( + # f"unexpected extra key(s) {keys}", + # object_path, + # ) + + # # validate the constructed value + # try: + # obj._validate() # noqa: SLF001 + # except ValueError as e: + # raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path or "/") from e diff --git a/python/knot_resolver/utils/modeling/data_model.py b/python/knot_resolver/utils/modeling/data_model.py new file mode 100644 index 000000000..ca1458b10 --- /dev/null +++ b/python/knot_resolver/utils/modeling/data_model.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +import enum +import inspect +from pathlib import Path +from typing import TYPE_CHECKING, Any, TypeVar + +from .context import Context, Strictness +from .errors import DataDescriptionError +from .parsing import ParsedData, ParsedDataWrapper +from .types.base_custom_type import BaseCustomType +from .types.inspect import ( + get_annotations, + get_base_generic_type_wrapper_argument, + get_optional_inner_type, + is_base_generic_type_wrapper, + is_dict, + is_list, + is_literal, + is_none_type, + is_optional, + is_union, +) + +if TYPE_CHECKING: + from .pointer import JSONPointer + + +# def _split_docstring(docstring: str) -> tuple[str, str] | None: +# if "---" not in docstring: +# return ("\n".join([s.strip() for s in docstring.splitlines()]).strip(), None) + +# doc, attrs_doc = docstring.split("---", maxsplit=1) +# return ( +# "\n".join([s.strip() for s in doc.splitlines()]).strip(), +# attrs_doc, +# ) + + +# def _describe_type(typ: type[Any]) -> dict[Any, Any]: +# if is_none_type(typ): +# return {"type": "null"} +# if typ is int: +# return {"type": "integer"} +# if typ is bool: +# return {"type": "boolean"} +# if typ is str: +# return {"type": "string"} + +# if inspect.isclass(typ) and issubclass(typ, DataModel): +# return typ.json_schema() + +# if inspect.isclass(typ) and issubclass(typ, BaseCustomType): +# return typ.json_schema() + +# if is_base_generic_type_wrapper(typ): +# wrapped = get_base_generic_type_wrapper_argument(typ) +# return _describe_type(wrapped) + +# if is_literal(typ): +# lit: list[str] = [] +# args = inspect.get_args(typ) +# for arg in args: +# if is_literal(arg): +# lit += inspect.get_args(arg) +# else: +# lit.append(arg) +# return {"type": "string", "enum": lit} + +# if is_optional(typ): +# desc = _describe_type(get_optional_inner_type(typ)) +# if "type" in desc: +# desc["type"] = [desc["type"], "null"] +# return desc +# return {"anyOf": [{"type": "null"}, desc]} + +# if is_union(typ): +# variants = inspect.get_args(typ) +# return {"anyOf": [_describe_type(v) for v in variants]} + +# if is_list(typ): +# return {"type": "array", "items": _describe_type(inspect.get_args(typ)[0])} + +# if is_dict(typ): +# key, value = inspect.get_args(typ) + +# if inspect.isclass(key) and issubclass(key, BaseCustomType): +# assert ( +# key.__str__ is not BaseCustomType.__str__ +# ), "To support derived 'BaseValueType', __str__ must be implemented." +# else: +# assert key is str, "We currently do not support any other keys then strings" + +# return {"type": "object", "additionalProperties": _describe_type(value)} + +# msg = f"JSON schema for type '{typ}' is not implemented" +# raise NotImplementedError(msg) + + +# def _get_attrs_docs(docstring: str) -> dict[str, str] | None: +# attrs_docs = _split_docstring(docstring)[1] +# if attrs_docs is None: +# return None +# return parse_yaml(attrs_docs) + + +def _get_json_schema_description(model_type: type[DataModel]) -> str: + return "" + + +# docstring = inspect.get_doc(node_type) +# if not docstring: +# msg = f"missing docstring for '{node_type}" +# raise DataDescriptionError(msg, node_path) +# return _split_docstring(docstring)[0] + + +def _get_json_schema_properties(model_type: type[DataModel]) -> dict[Any, Any]: + return {} + + +# schema: dict[str, Any] = {} + +# docstring = inspect.get_doc(model_type) +# if not docstring: +# msg = f"missing docstring for '{model_type}'" +# raise DataDescriptionError(msg) +# attrs_docs = _get_attrs_docs(docstring) + +# annotations: dict[str, Any] = get_annotations(model_type) +# for attr_name, attr_type in annotations.items(): +# name = attr_name.replace("_", "-") +# schema[name] = _describe_type(attr_type) + +# description = attrs_docs.pop(attr_name, None) +# if description is None: +# msg = f"missing description for '{attr_name}' in docstring for '{model_type}'" +# raise DataDescriptionError(msg) +# schema[name]["description"] = description + +# default = getattr(model_type, attr_name, None) +# if default: +# schema[name]["default"] = default + +# if attrs_docs: +# msg = f"additional description in '{model_type}' docstring: {tuple(attrs_docs.keys())}" +# raise DataDescriptionError(msg) + +# return schema + + +# def data_combine(data1: DataModel, data2: DataModel, *data: DataModel) -> DataModel: +# result = + +# for arg in args: +# data_combine() + + +def _assign_type(): + pass + + +def _assign_attributes(self: DataModel) -> None: + source: ParsedData = self._source + annot: dict[str, Any] = get_annotations(type(self)) + + if isinstance(source, ParsedDataWrapper): + base_path = source.file.parent + source = source.data + + if isinstance(source, dict): + + + for key, value in source.items(): + _key = key.replace("-", "_") + + if _key in annot: + _type = annot[key] + setattr(self, _key, _type(source[key])) + + + +class DataModel: + """Represents the data model and the source data. + + The source data is stored in its original form and is simply organized into the attributes of the data model. + For advanced validation, first you need to assign_defaults() and then call the validate() method. + + Attributes: + source (ParsedData | None): Source data in the dictionary. + pointer (JSONPointer): A JSON pointer that indicates the current position in the data model subtree. + base_path (str | Path): The base path for files and directories relative paths. + + """ + + def __init__( + self, + source: ParsedData | None = None, + pointer: JSONPointer = "/", + base_path: str | Path = Path(), + ) -> None: + self._source = source or {} + self._validated: bool = False + self._pointer = pointer + self._base_path = Path(base_path) + _assign_attributes(self) + + # def append(self, additional_data: DataModel) -> None: + # def data_combine(data1: DataModel, data2: DataModel) -> None: + # if type(data1) is not type(data2): + # # error here + # pass + + # annot: dict[str, Any] = get_annotations(type(self)) + # for attr_name, attr_type in annot.items(): + # name = attr_name.replace("_", "-") + + # if not hasattr(self, attr_name) and hasattr(additional_data, attr_name): + # attr_value = getattr(additional_data, attr_name) + # setattr(self, attr_name, attr_value) + + # if inspect.isclass(attr_type) and issubclass(attr_type, DataModel): + # if hasattr(data1, attr_name) and hasattr(data2, attr_name): + # data_combine(getattr(data1, attr_name), getattr(data2, attr_name)) + + # data_combine(self, additional_data) + + def assign_defaults(self) -> None: + """Assign default values to missing attributes. + + A value is assigned if attribute is missing and default value is available for it. + Without default values, data validation may fail even if the data is valid. + """ + cls = type(self) + annot: dict[str, Any] = get_annotations(cls) + + for key in annot: + if not hasattr(key, key) and hasattr(cls, key): + default = getattr(cls, key) + setattr(self, key, default) + + def _validate_subtree(self, context: Context) -> None: + for attr_value in vars(self).values(): + if isinstance(attr_value, BaseCustomType): + attr_value.validate(context) + + def _validate(self, context: Context) -> None: + """Validate all data under the node subtree. + + All validation should be done here. + This method is automatically called during validation. + """ + + def validate(self, context: Context | None = None) -> None: + """Validate all data under the node subtree. + + At least BASIC validation strictness is required + for the data value to be considered valid. + If validation is successful, no error is raised. + + Args: + context (Context | None): + Optional, validation context for the validation operations, e.g. validation strictness + or username and groupname to check permissions on paths. + If set to None, Context with STRICT validation strictness is used. + + Raises: + DataTypeError: When input data type validation fails. + DataValueError: When input data value validation fails. + DataValidationError: When some other input data validation fails. + + """ + if context is None: + # STRICT validation is used by default + context = Context(strictness=Strictness.STRICT) + + self._validate_subtree(context) + self._validate(context) + if context.strictness > Strictness.NORMAL: + self._validated = True + + @classmethod + def json_schema_minimal(cls) -> dict[str, Any]: + """Get minimal JSON schema without any additional metadata. + + Returns: + Minimal JSON schema definition of the entire data model subtree without any additional metadata. + + """ + return { + "type": "object", + "description": _get_json_schema_description(cls), + "properties": _get_json_schema_properties(cls), + } + + @classmethod + def json_schema(cls, schema_id: str, schema_title: str, schema_description: str) -> dict[str, Any]: + """Get JSON schema with metadata. + + Args: + schema_id (str): Unique URI for the JSON schema. + schema_title (str): Title for the JSON schema. + schema_description (str): Description for the JSON schema. + + Returns: + Proper JSON schema definition of the entire data model subtree + with additional metadata ($schema, $id, title and description). + + """ + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": schema_id, + "title": schema_title, + "type": "object", + "description": schema_description, + "properties": _get_json_schema_properties(cls), + } diff --git a/python/knot_resolver/utils/modeling/data_store.py b/python/knot_resolver/utils/modeling/data_store.py new file mode 100644 index 000000000..04d0730ca --- /dev/null +++ b/python/knot_resolver/utils/modeling/data_store.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from asyncio import Lock +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from .parsing import try_to_parse_file + +if TYPE_CHECKING: + from .data_model import DataModel + + +class DataStore: + """Used to load, store and manage a data. + + Attributes: + files (list[str | Path]): + A list of configuration files from which the configuration should be loaded. + + """ + + def __init__(self, files: list[str | Path], model: type[DataModel]) -> None: + self._model = model + self._files = files + + # self._verifier: list[Any] = [] + # self._callbacks: list[Any] = [] + self._lock: Lock = Lock() + + def load_files(self) -> None: + data: DataModel = self._model() + for file in self._files: + file_path = Path(file) + file_parsed_data = try_to_parse_file(file_path) + + base_path = file_path.parent + file_data: DataModel = self._model(file_parsed_data, base_path=base_path) + # combined_data.append(file_data) + + raise ValueError + # config.validate() diff --git a/python/knot_resolver/utils/modeling/parsing.py b/python/knot_resolver/utils/modeling/parsing.py index f9df10e88..3e7eb3e1e 100644 --- a/python/knot_resolver/utils/modeling/parsing.py +++ b/python/knot_resolver/utils/modeling/parsing.py @@ -31,7 +31,9 @@ class ParsedDataWrapper: self.file = Path(file) -ParsedData = Union[Dict[str, "ParsedData"], List["ParsedData"], ParsedDataWrapper, str, int, float, bool, None] +ParsedDataBase = Union[List["ParsedDataBase"], ParsedDataWrapper, str, int, float, bool, None] + +ParsedData = Union[Dict[str, "ParsedDataBase"], ParsedDataWrapper] def _yaml_include_constructor(self: _YAMLRaiseDuplicatesIncludeLoader, node: MappingNode) -> ParsedDataWrapper: diff --git a/python/knot_resolver/utils/modeling/pointer.py b/python/knot_resolver/utils/modeling/pointer.py new file mode 100644 index 000000000..aca4a5650 --- /dev/null +++ b/python/knot_resolver/utils/modeling/pointer.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +JSONPointer = str + + +# /path/to/config.yaml::cache/size: +# /api::cache:size: diff --git a/tests/python/knot_resolver/utils/modeling/config.test.json b/tests/python/knot_resolver/utils/modeling/config.test.json new file mode 100644 index 000000000..5bd7d9c97 --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/config.test.json @@ -0,0 +1,3 @@ +{ + "test": "this is test" +} diff --git a/tests/python/knot_resolver/utils/modeling/config.test.yaml b/tests/python/knot_resolver/utils/modeling/config.test.yaml new file mode 100644 index 000000000..0755ab55e --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/config.test.yaml @@ -0,0 +1 @@ +test: this not is test diff --git a/tests/python/knot_resolver/utils/modeling/test_data_model.py b/tests/python/knot_resolver/utils/modeling/test_data_model.py new file mode 100644 index 000000000..4a5338587 --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/test_data_model.py @@ -0,0 +1,113 @@ +import pytest + +# from typing import Any + +from knot_resolver.utils.modeling.data_model import DataModel +# from knot_resolver.utils.modeling.errors import DataDescriptionError +from knot_resolver.utils.modeling.parsing import try_to_parse_file + + + +class TestModel(DataModel): + test: str = "default value" + + + + + + +# class NoDescription(DataModelNode): +# pass + + +# class SingleLineDescription(DataModelNode): +# """ +# Single line description. +# """ + + +# class MultiLineDescription(DataModelNode): +# """ +# Multi line +# description. +# """ + + +# class AttributesDescription(DataModelNode): +# """ +# Data model node description. +# --- +# integer_attr: Description for the integer attribute. +# string_attr: Description for the string attribute. +# """ + +# integer: int +# string: str + + +@pytest.mark.parametrize("model", [TestModel]) +def test_data_model(model: DataModel): + parsed_data = try_to_parse_file("/home/amrazek/src/knot-resolver/tests/python/knot_resolver/utils/modeling/config.test.yaml") + modeled = TestModel(parsed_data) + + assert modeled.test == "this is test" + + +# @pytest.mark.parametrize("model", [AttributesDescription]) +# def test_json_schema(model: Any): +# schema = model.json_schema() + + +# @pytest.mark.parametrize("model", [NoDescription, SingleLineDescription, MultiLineDescription]) +# def test_json_schema_invalid(model: Any): +# with pytest.raises(DataDescriptionError): +# model.json_schema() + + +# # class FieldsDescription(ConfigSchema): +# # """ +# # This is an awesome test class +# # --- +# # field: This field does nothing interesting +# # value: Neither does this +# # """ + +# # field: str +# # value: int + +# # schema = FieldsDescription.json_schema() +# # assert schema["description"] == "This is an awesome test class" +# # assert schema["properties"]["field"]["description"] == "This field does nothing interesting" +# # assert schema["properties"]["value"]["description"] == "Neither does this" + +# # class NoDescription(ConfigSchema): +# # nothing: str + +# # _ = NoDescription.json_schema() + + +# # def test_docstring_parsing_invalid(): +# # class AdditionalItem(ConfigSchema): +# # """ +# # This class is wrong +# # --- +# # field: nope +# # nothing: really nothing +# # """ + +# # nothing: str + +# # with raises(DataDescriptionError): +# # _ = AdditionalItem.json_schema() + +# # class WrongDescription(ConfigSchema): +# # """ +# # This class is wrong +# # --- +# # other: description +# # """ + +# # nothing: str + +# # with raises(DataDescriptionError): +# # _ = WrongDescription.json_schema() diff --git a/tests/python/knot_resolver/utils/modeling/test_data_store.py b/tests/python/knot_resolver/utils/modeling/test_data_store.py new file mode 100644 index 000000000..84cda2701 --- /dev/null +++ b/tests/python/knot_resolver/utils/modeling/test_data_store.py @@ -0,0 +1,20 @@ +from pathlib import Path + +import pytest + +from knot_resolver.utils.modeling import DataModel, DataStore + +base_path = Path(__file__).parent + + +class TestModel(DataModel): + pass + + +# def test_data_store_load_files() -> None: +# files = [ +# base_path / "config.test.yaml", +# base_path / "config.test.json", +# ] +# data_store = DataStore(files, TestModel) +# data_store.load_files()