From 1f0f14c51f393aea0604f28e95e441240a8a945b Mon Sep 17 00:00:00 2001 From: Vasek Sraier Date: Wed, 8 Mar 2023 13:34:56 +0100 Subject: [PATCH] manager: datamodel: better representation of paths and better error messages --- .../datamodel/cache_schema.py | 6 +- .../datamodel/config_schema.py | 6 +- .../datamodel/logging_schema.py | 4 +- .../datamodel/management_schema.py | 4 +- .../datamodel/network_schema.py | 13 +-- .../datamodel/policy_schema.py | 4 +- .../datamodel/rpz_schema.py | 4 +- .../datamodel/static_hints_schema.py | 6 +- .../datamodel/types/__init__.py | 12 ++- .../datamodel/types/types.py | 96 ++++++++++++++++++- .../datamodel/webmgmt_schema.py | 8 +- .../datamodel/templates/test_common_macros.py | 3 +- .../unit/datamodel/test_management_schema.py | 4 +- .../unit/datamodel/test_network_schema.py | 3 +- .../unit/datamodel/types/test_custom_types.py | 4 +- 15 files changed, 134 insertions(+), 43 deletions(-) diff --git a/manager/knot_resolver_manager/datamodel/cache_schema.py b/manager/knot_resolver_manager/datamodel/cache_schema.py index 831826274..6813c6855 100644 --- a/manager/knot_resolver_manager/datamodel/cache_schema.py +++ b/manager/knot_resolver_manager/datamodel/cache_schema.py @@ -1,6 +1,6 @@ from typing import List, Optional -from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, SizeUnit, TimeUnit +from knot_resolver_manager.datamodel.types import Dir, DomainName, File, SizeUnit, TimeUnit from knot_resolver_manager.utils.modeling import BaseSchema @@ -18,7 +18,7 @@ class PrefillSchema(BaseSchema): origin: DomainName url: str refresh_interval: TimeUnit = TimeUnit("1d") - ca_file: Optional[CheckedPath] = None + ca_file: Optional[File] = None def _validate(self) -> None: if str(self.origin) != ".": @@ -40,7 +40,7 @@ class CacheSchema(BaseSchema): """ garbage_collector: bool = True - storage: CheckedPath = CheckedPath("/var/cache/knot-resolver") + storage: Dir = Dir("/var/cache/knot-resolver") size_max: SizeUnit = SizeUnit("100M") ttl_min: TimeUnit = TimeUnit("5s") ttl_max: TimeUnit = TimeUnit("6d") diff --git a/manager/knot_resolver_manager/datamodel/config_schema.py b/manager/knot_resolver_manager/datamodel/config_schema.py index d40f225d3..3d8bd5fa7 100644 --- a/manager/knot_resolver_manager/datamodel/config_schema.py +++ b/manager/knot_resolver_manager/datamodel/config_schema.py @@ -23,7 +23,7 @@ from knot_resolver_manager.datamodel.rpz_schema import RPZSchema from knot_resolver_manager.datamodel.slice_schema import SliceSchema from knot_resolver_manager.datamodel.static_hints_schema import StaticHintsSchema from knot_resolver_manager.datamodel.stub_zone_schema import StubZoneSchema -from knot_resolver_manager.datamodel.types.types import IntPositive, UncheckedPath +from knot_resolver_manager.datamodel.types import AbsoluteDir, IntPositive from knot_resolver_manager.datamodel.view_schema import ViewSchema from knot_resolver_manager.datamodel.webmgmt_schema import WebmgmtSchema from knot_resolver_manager.utils.modeling import BaseSchema @@ -112,7 +112,7 @@ class KresConfig(BaseSchema): version: int = 1 nsid: Optional[str] = None hostname: Optional[str] = None - rundir: UncheckedPath = UncheckedPath(".") + rundir: AbsoluteDir = AbsoluteDir("/var/run/knot-resolver") workers: Union[Literal["auto"], IntPositive] = IntPositive(1) max_workers: IntPositive = IntPositive(_default_max_worker_count()) management: ManagementSchema = ManagementSchema({"unix-socket": "./manager.sock"}) @@ -137,7 +137,7 @@ class KresConfig(BaseSchema): nsid: Optional[str] hostname: str - rundir: UncheckedPath + rundir: AbsoluteDir workers: IntPositive max_workers: IntPositive management: ManagementSchema diff --git a/manager/knot_resolver_manager/datamodel/logging_schema.py b/manager/knot_resolver_manager/datamodel/logging_schema.py index 1a107019c..1ba395685 100644 --- a/manager/knot_resolver_manager/datamodel/logging_schema.py +++ b/manager/knot_resolver_manager/datamodel/logging_schema.py @@ -3,7 +3,7 @@ from typing import Any, List, Optional, Set, Type, Union, cast from typing_extensions import Literal -from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit +from knot_resolver_manager.datamodel.types import FilePath, TimeUnit from knot_resolver_manager.utils.modeling import BaseSchema from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_valid @@ -80,7 +80,7 @@ class DnstapSchema(BaseSchema): log_tcp_rtt: Log TCP RTT (Round-trip time). """ - unix_socket: CheckedPath + unix_socket: FilePath log_queries: bool = True log_responses: bool = True log_tcp_rtt: bool = True diff --git a/manager/knot_resolver_manager/datamodel/management_schema.py b/manager/knot_resolver_manager/datamodel/management_schema.py index e58f1a728..80c0efafa 100644 --- a/manager/knot_resolver_manager/datamodel/management_schema.py +++ b/manager/knot_resolver_manager/datamodel/management_schema.py @@ -1,6 +1,6 @@ from typing import Optional -from knot_resolver_manager.datamodel.types import CheckedPath, IPAddressPort +from knot_resolver_manager.datamodel.types import FilePath, IPAddressPort from knot_resolver_manager.utils.modeling import BaseSchema @@ -13,7 +13,7 @@ class ManagementSchema(BaseSchema): interface: IP address and port number to listen to. """ - unix_socket: Optional[CheckedPath] = None + unix_socket: Optional[FilePath] = None interface: Optional[IPAddressPort] = None def _validate(self) -> None: diff --git a/manager/knot_resolver_manager/datamodel/network_schema.py b/manager/knot_resolver_manager/datamodel/network_schema.py index 3bdad9a50..7667bbd96 100644 --- a/manager/knot_resolver_manager/datamodel/network_schema.py +++ b/manager/knot_resolver_manager/datamodel/network_schema.py @@ -3,7 +3,8 @@ from typing import List, Optional, Union from typing_extensions import Literal from knot_resolver_manager.datamodel.types import ( - CheckedPath, + File, + FilePath, Int0_512, Int0_65535, InterfaceOptionalPort, @@ -58,10 +59,10 @@ class TLSSchema(BaseSchema): padding: EDNS(0) padding of answers to queries that arrive over TLS transport. """ - cert_file: Optional[CheckedPath] = None - key_file: Optional[CheckedPath] = None + cert_file: Optional[File] = None + key_file: Optional[File] = None sticket_secret: Optional[str] = None - sticket_secret_file: Optional[CheckedPath] = None + sticket_secret_file: Optional[File] = None auto_discovery: bool = False padding: Union[bool, Int0_512] = True @@ -84,7 +85,7 @@ class ListenSchema(BaseSchema): """ interface: Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]] = None - unix_socket: Union[None, CheckedPath, List[CheckedPath]] = None + unix_socket: Union[None, FilePath, List[FilePath]] = None port: Optional[PortNumber] = None kind: KindEnum = "dns" freebind: bool = False @@ -92,7 +93,7 @@ class ListenSchema(BaseSchema): _LAYER = Raw interface: Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]] - unix_socket: Union[None, CheckedPath, List[CheckedPath]] + unix_socket: Union[None, FilePath, List[FilePath]] port: Optional[PortNumber] kind: KindEnum freebind: bool diff --git a/manager/knot_resolver_manager/datamodel/policy_schema.py b/manager/knot_resolver_manager/datamodel/policy_schema.py index f081879b0..072406a0e 100644 --- a/manager/knot_resolver_manager/datamodel/policy_schema.py +++ b/manager/knot_resolver_manager/datamodel/policy_schema.py @@ -2,9 +2,9 @@ from typing import List, Optional, Union from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema from knot_resolver_manager.datamodel.types import ( - CheckedPath, DNSRecordTypeEnum, DomainName, + File, IPAddressOptionalPort, PolicyActionEnum, PolicyFlagEnum, @@ -59,7 +59,7 @@ class ForwardServerSchema(BaseSchema): address: IPAddressOptionalPort pin_sha256: Optional[Union[str, List[str]]] = None hostname: Optional[DomainName] = None - ca_file: Optional[CheckedPath] = None + ca_file: Optional[File] = None def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None: diff --git a/manager/knot_resolver_manager/datamodel/rpz_schema.py b/manager/knot_resolver_manager/datamodel/rpz_schema.py index a0032b70b..050eeed1a 100644 --- a/manager/knot_resolver_manager/datamodel/rpz_schema.py +++ b/manager/knot_resolver_manager/datamodel/rpz_schema.py @@ -1,6 +1,6 @@ from typing import List, Optional -from knot_resolver_manager.datamodel.types import CheckedPath, PolicyActionEnum, PolicyFlagEnum +from knot_resolver_manager.datamodel.types import File, PolicyActionEnum, PolicyFlagEnum from knot_resolver_manager.utils.modeling import BaseSchema @@ -18,7 +18,7 @@ class RPZSchema(BaseSchema): """ action: PolicyActionEnum - file: CheckedPath + file: File watch: bool = True views: Optional[List[str]] = None options: Optional[List[PolicyFlagEnum]] = None diff --git a/manager/knot_resolver_manager/datamodel/static_hints_schema.py b/manager/knot_resolver_manager/datamodel/static_hints_schema.py index 1a45d9a1f..409526301 100644 --- a/manager/knot_resolver_manager/datamodel/static_hints_schema.py +++ b/manager/knot_resolver_manager/datamodel/static_hints_schema.py @@ -1,6 +1,6 @@ from typing import Dict, List, Optional -from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddress, TimeUnit +from knot_resolver_manager.datamodel.types import DomainName, File, IPAddress, TimeUnit from knot_resolver_manager.utils.modeling import BaseSchema @@ -22,6 +22,6 @@ class StaticHintsSchema(BaseSchema): nodata: bool = True etc_hosts: bool = False root_hints: Optional[Dict[DomainName, List[IPAddress]]] = None - root_hints_file: Optional[CheckedPath] = None + root_hints_file: Optional[File] = None hints: Optional[Dict[DomainName, List[IPAddress]]] = None - hints_files: Optional[List[CheckedPath]] = None + hints_files: Optional[List[File]] = None diff --git a/manager/knot_resolver_manager/datamodel/types/__init__.py b/manager/knot_resolver_manager/datamodel/types/__init__.py index ccb0490c9..225a8f444 100644 --- a/manager/knot_resolver_manager/datamodel/types/__init__.py +++ b/manager/knot_resolver_manager/datamodel/types/__init__.py @@ -1,7 +1,10 @@ from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum from .types import ( - CheckedPath, + AbsoluteDir, + Dir, DomainName, + File, + FilePath, Int0_512, Int0_65535, InterfaceName, @@ -19,14 +22,12 @@ from .types import ( PortNumber, SizeUnit, TimeUnit, - UncheckedPath, ) __all__ = [ "PolicyActionEnum", "PolicyFlagEnum", "DNSRecordTypeEnum", - "CheckedPath", "DomainName", "Int0_512", "Int0_65535", @@ -45,5 +46,8 @@ __all__ = [ "PortNumber", "SizeUnit", "TimeUnit", - "UncheckedPath", + "AbsoluteDir", + "File", + "FilePath", + "Dir", ] diff --git a/manager/knot_resolver_manager/datamodel/types/types.py b/manager/knot_resolver_manager/datamodel/types/types.py index 43e47a0ba..d9f725acd 100644 --- a/manager/knot_resolver_manager/datamodel/types/types.py +++ b/manager/knot_resolver_manager/datamodel/types/types.py @@ -1,7 +1,7 @@ import ipaddress import re from pathlib import Path -from typing import Any, Dict, Optional, Type, Union +from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase from knot_resolver_manager.utils.modeling import BaseValueType @@ -409,11 +409,20 @@ class UncheckedPath(BaseValueType): _value: Path - def __init__(self, source_value: Any, object_path: str = "/") -> None: + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, object_path=object_path) + self._object_path: str = object_path + self._parents: Tuple[UncheckedPath, ...] = parents if isinstance(source_value, str): self._raw_value: str = source_value - self._value: Path = Path(source_value) + if self._parents: + pp = map(lambda p: p.to_path(), self._parents) + self._value: Path = Path(*pp, source_value) + else: + self._value: Path = Path(source_value) else: raise ValueError(f"expected file path in a string, got '{source_value}' with type '{type(source_value)}'.") @@ -435,6 +444,18 @@ class UncheckedPath(BaseValueType): def serialize(self) -> Any: return self._raw_value + def relative_to(self, parent: "UncheckedPath") -> "UncheckedPath": + """return a path with an added parent part""" + return UncheckedPath(self._raw_value, parents=(parent, *self._parents), object_path=self._object_path) + + UPT = TypeVar("UPT", bound="UncheckedPath") + + def reconstruct(self, cls: Type[UPT]) -> UPT: + """ + Rebuild this object as an instance of its subclass. Practically, allows for conversions from + """ + return cls(self._raw_value, parents=self._parents, object_path=self._object_path) + @classmethod def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]: return { @@ -442,14 +463,79 @@ class UncheckedPath(BaseValueType): } +class Dir(UncheckedPath): + """ + Path, that is enforced to be: + - an existing directory + """ + + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, parents=parents, object_path=object_path) + if not self._value.is_dir(): + raise ValueError("path does not point to an existing directory") + + +class AbsoluteDir(Dir): + """ + Path, that is enforced to be: + - absolute + - an existing directory + """ + + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, parents=parents, object_path=object_path) + if not self._value.is_absolute(): + raise ValueError("path not absolute") + + +class File(UncheckedPath): + """ + Path, that is enforced to be: + - an existing file + """ + + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, parents=parents, object_path=object_path) + if not self._value.exists(): + raise ValueError("file does not exist") + if not self._value.is_file(): + raise ValueError("path is not a file") + + +class FilePath(UncheckedPath): + """ + Path, that is enforced to be: + - parent of the last path segment is an existing directory + - it does not point to a dir + """ + + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, parents=parents, object_path=object_path) + p = self._value.parent + if not p.exists() or not p.is_dir(): + raise ValueError("path does not point inside an existing directory") + if self._value.is_dir(): + raise ValueError("path points to a directory when we expected a file") + + class CheckedPath(UncheckedPath): """ Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle, no symlink loops. This however means, that resolving of relative path happens while validating. """ - def __init__(self, source_value: Any, object_path: str = "/") -> None: - super().__init__(source_value, object_path=object_path) + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, parents=parents, object_path=object_path) try: self._value = self._value.resolve(strict=False) except RuntimeError as e: diff --git a/manager/knot_resolver_manager/datamodel/webmgmt_schema.py b/manager/knot_resolver_manager/datamodel/webmgmt_schema.py index c99ddaa06..f8174f22b 100644 --- a/manager/knot_resolver_manager/datamodel/webmgmt_schema.py +++ b/manager/knot_resolver_manager/datamodel/webmgmt_schema.py @@ -1,6 +1,6 @@ from typing import Optional -from knot_resolver_manager.datamodel.types import CheckedPath, InterfacePort +from knot_resolver_manager.datamodel.types import File, FilePath, InterfacePort from knot_resolver_manager.utils.modeling import BaseSchema @@ -16,11 +16,11 @@ class WebmgmtSchema(BaseSchema): key_file: Path to certificate key. """ - unix_socket: Optional[CheckedPath] = None + unix_socket: Optional[FilePath] = None interface: Optional[InterfacePort] = None tls: bool = False - cert_file: Optional[CheckedPath] = None - key_file: Optional[CheckedPath] = None + cert_file: Optional[File] = None + key_file: Optional[File] = None def _validate(self) -> None: if bool(self.unix_socket) == bool(self.interface): diff --git a/manager/tests/unit/datamodel/templates/test_common_macros.py b/manager/tests/unit/datamodel/templates/test_common_macros.py index 18f1ffd5d..6cb240503 100644 --- a/manager/tests/unit/datamodel/templates/test_common_macros.py +++ b/manager/tests/unit/datamodel/templates/test_common_macros.py @@ -49,7 +49,8 @@ def test_servers_table(): def test_tls_servers_table(): d = ForwardServerSchema( - {"address": "2001:DB8::d0c", "hostname": "res.example.com", "ca-file": "/etc/knot-resolver/tlsca.crt"} + # the ca-file is a dummy, because it's existence is checked + {"address": "2001:DB8::d0c", "hostname": "res.example.com", "ca-file": "/etc/passwd"} ) t = [d, ForwardServerSchema({"address": "192.0.2.1", "pin-sha256": "YQ=="})] tmpl_str = """{% from 'macros/common_macros.lua.j2' import tls_servers_table %} diff --git a/manager/tests/unit/datamodel/test_management_schema.py b/manager/tests/unit/datamodel/test_management_schema.py index 1d3d90ca0..870e7208d 100644 --- a/manager/tests/unit/datamodel/test_management_schema.py +++ b/manager/tests/unit/datamodel/test_management_schema.py @@ -6,7 +6,7 @@ from knot_resolver_manager.datamodel.management_schema import ManagementSchema from knot_resolver_manager.utils.modeling.exceptions import DataValidationError -@pytest.mark.parametrize("val", [{"interface": "::1@53"}, {"unix-socket": "/path/socket"}]) +@pytest.mark.parametrize("val", [{"interface": "::1@53"}, {"unix-socket": "/tmp/socket"}]) def test_management_valid(val: Dict[str, Any]): o = ManagementSchema(val) if o.interface: @@ -15,7 +15,7 @@ def test_management_valid(val: Dict[str, Any]): assert str(o.unix_socket) == val["unix-socket"] -@pytest.mark.parametrize("val", [None, {"interface": "::1@53", "unix-socket": "/path/socket"}]) +@pytest.mark.parametrize("val", [None, {"interface": "::1@53", "unix-socket": "/tmp/socket"}]) def test_management_invalid(val: Optional[Dict[str, Any]]): with pytest.raises(DataValidationError): ManagementSchema(val) diff --git a/manager/tests/unit/datamodel/test_network_schema.py b/manager/tests/unit/datamodel/test_network_schema.py index 59f105b2f..1a398b50a 100644 --- a/manager/tests/unit/datamodel/test_network_schema.py +++ b/manager/tests/unit/datamodel/test_network_schema.py @@ -4,8 +4,7 @@ import pytest from pytest import raises from knot_resolver_manager.datamodel.network_schema import ListenSchema, NetworkSchema -from knot_resolver_manager.datamodel.types import PortNumber -from knot_resolver_manager.datamodel.types.types import InterfaceOptionalPort +from knot_resolver_manager.datamodel.types import InterfaceOptionalPort, PortNumber from knot_resolver_manager.utils.modeling.exceptions import DataValidationError diff --git a/manager/tests/unit/datamodel/types/test_custom_types.py b/manager/tests/unit/datamodel/types/test_custom_types.py index 2f9b1c192..bb9898e7b 100644 --- a/manager/tests/unit/datamodel/types/test_custom_types.py +++ b/manager/tests/unit/datamodel/types/test_custom_types.py @@ -7,7 +7,6 @@ import pytest from pytest import raises from knot_resolver_manager.datamodel.types import ( - CheckedPath, DomainName, InterfaceName, InterfaceOptionalPort, @@ -22,6 +21,7 @@ from knot_resolver_manager.datamodel.types import ( SizeUnit, TimeUnit, ) +from knot_resolver_manager.datamodel.types import Dir from knot_resolver_manager.utils.modeling import BaseSchema @@ -85,7 +85,7 @@ def test_parsing_units(): def test_checked_path(): class TestSchema(BaseSchema): - p: CheckedPath + p: Dir assert str(TestSchema({"p": "/tmp"}).p) == "/tmp" -- 2.47.3