from typing import List, Optional
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, SizeUnit, TimeUnit
+from knot_resolver_manager.datamodel.types import Dir, DomainName, File, SizeUnit, TimeUnit
from knot_resolver_manager.utils.modeling import BaseSchema
origin: DomainName
url: str
refresh_interval: TimeUnit = TimeUnit("1d")
- ca_file: Optional[CheckedPath] = None
+ ca_file: Optional[File] = None
def _validate(self) -> None:
if str(self.origin) != ".":
"""
garbage_collector: bool = True
- storage: CheckedPath = CheckedPath("/var/cache/knot-resolver")
+ storage: Dir = Dir("/var/cache/knot-resolver")
size_max: SizeUnit = SizeUnit("100M")
ttl_min: TimeUnit = TimeUnit("5s")
ttl_max: TimeUnit = TimeUnit("6d")
from knot_resolver_manager.datamodel.slice_schema import SliceSchema
from knot_resolver_manager.datamodel.static_hints_schema import StaticHintsSchema
from knot_resolver_manager.datamodel.stub_zone_schema import StubZoneSchema
-from knot_resolver_manager.datamodel.types.types import IntPositive, UncheckedPath
+from knot_resolver_manager.datamodel.types import AbsoluteDir, IntPositive
from knot_resolver_manager.datamodel.view_schema import ViewSchema
from knot_resolver_manager.datamodel.webmgmt_schema import WebmgmtSchema
from knot_resolver_manager.utils.modeling import BaseSchema
version: int = 1
nsid: Optional[str] = None
hostname: Optional[str] = None
- rundir: UncheckedPath = UncheckedPath(".")
+ rundir: AbsoluteDir = AbsoluteDir("/var/run/knot-resolver")
workers: Union[Literal["auto"], IntPositive] = IntPositive(1)
max_workers: IntPositive = IntPositive(_default_max_worker_count())
management: ManagementSchema = ManagementSchema({"unix-socket": "./manager.sock"})
nsid: Optional[str]
hostname: str
- rundir: UncheckedPath
+ rundir: AbsoluteDir
workers: IntPositive
max_workers: IntPositive
management: ManagementSchema
from typing_extensions import Literal
-from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
+from knot_resolver_manager.datamodel.types import FilePath, TimeUnit
from knot_resolver_manager.utils.modeling import BaseSchema
from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_valid
log_tcp_rtt: Log TCP RTT (Round-trip time).
"""
- unix_socket: CheckedPath
+ unix_socket: FilePath
log_queries: bool = True
log_responses: bool = True
log_tcp_rtt: bool = True
from typing import Optional
-from knot_resolver_manager.datamodel.types import CheckedPath, IPAddressPort
+from knot_resolver_manager.datamodel.types import FilePath, IPAddressPort
from knot_resolver_manager.utils.modeling import BaseSchema
interface: IP address and port number to listen to.
"""
- unix_socket: Optional[CheckedPath] = None
+ unix_socket: Optional[FilePath] = None
interface: Optional[IPAddressPort] = None
def _validate(self) -> None:
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import (
- CheckedPath,
+ File,
+ FilePath,
Int0_512,
Int0_65535,
InterfaceOptionalPort,
padding: EDNS(0) padding of answers to queries that arrive over TLS transport.
"""
- cert_file: Optional[CheckedPath] = None
- key_file: Optional[CheckedPath] = None
+ cert_file: Optional[File] = None
+ key_file: Optional[File] = None
sticket_secret: Optional[str] = None
- sticket_secret_file: Optional[CheckedPath] = None
+ sticket_secret_file: Optional[File] = None
auto_discovery: bool = False
padding: Union[bool, Int0_512] = True
"""
interface: Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]] = None
- unix_socket: Union[None, CheckedPath, List[CheckedPath]] = None
+ unix_socket: Union[None, FilePath, List[FilePath]] = None
port: Optional[PortNumber] = None
kind: KindEnum = "dns"
freebind: bool = False
_LAYER = Raw
interface: Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]]
- unix_socket: Union[None, CheckedPath, List[CheckedPath]]
+ unix_socket: Union[None, FilePath, List[FilePath]]
port: Optional[PortNumber]
kind: KindEnum
freebind: bool
from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
from knot_resolver_manager.datamodel.types import (
- CheckedPath,
DNSRecordTypeEnum,
DomainName,
+ File,
IPAddressOptionalPort,
PolicyActionEnum,
PolicyFlagEnum,
address: IPAddressOptionalPort
pin_sha256: Optional[Union[str, List[str]]] = None
hostname: Optional[DomainName] = None
- ca_file: Optional[CheckedPath] = None
+ ca_file: Optional[File] = None
def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None:
from typing import List, Optional
-from knot_resolver_manager.datamodel.types import CheckedPath, PolicyActionEnum, PolicyFlagEnum
+from knot_resolver_manager.datamodel.types import File, PolicyActionEnum, PolicyFlagEnum
from knot_resolver_manager.utils.modeling import BaseSchema
"""
action: PolicyActionEnum
- file: CheckedPath
+ file: File
watch: bool = True
views: Optional[List[str]] = None
options: Optional[List[PolicyFlagEnum]] = None
from typing import Dict, List, Optional
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddress, TimeUnit
+from knot_resolver_manager.datamodel.types import DomainName, File, IPAddress, TimeUnit
from knot_resolver_manager.utils.modeling import BaseSchema
nodata: bool = True
etc_hosts: bool = False
root_hints: Optional[Dict[DomainName, List[IPAddress]]] = None
- root_hints_file: Optional[CheckedPath] = None
+ root_hints_file: Optional[File] = None
hints: Optional[Dict[DomainName, List[IPAddress]]] = None
- hints_files: Optional[List[CheckedPath]] = None
+ hints_files: Optional[List[File]] = None
from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum
from .types import (
- CheckedPath,
+ AbsoluteDir,
+ Dir,
DomainName,
+ File,
+ FilePath,
Int0_512,
Int0_65535,
InterfaceName,
PortNumber,
SizeUnit,
TimeUnit,
- UncheckedPath,
)
__all__ = [
"PolicyActionEnum",
"PolicyFlagEnum",
"DNSRecordTypeEnum",
- "CheckedPath",
"DomainName",
"Int0_512",
"Int0_65535",
"PortNumber",
"SizeUnit",
"TimeUnit",
- "UncheckedPath",
+ "AbsoluteDir",
+ "File",
+ "FilePath",
+ "Dir",
]
import ipaddress
import re
from pathlib import Path
-from typing import Any, Dict, Optional, Type, Union
+from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union
from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase
from knot_resolver_manager.utils.modeling import BaseValueType
_value: Path
- def __init__(self, source_value: Any, object_path: str = "/") -> None:
+ def __init__(
+ self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+ ) -> None:
+
super().__init__(source_value, object_path=object_path)
+ self._object_path: str = object_path
+ self._parents: Tuple[UncheckedPath, ...] = parents
if isinstance(source_value, str):
self._raw_value: str = source_value
- self._value: Path = Path(source_value)
+ if self._parents:
+ pp = map(lambda p: p.to_path(), self._parents)
+ self._value: Path = Path(*pp, source_value)
+ else:
+ self._value: Path = Path(source_value)
else:
raise ValueError(f"expected file path in a string, got '{source_value}' with type '{type(source_value)}'.")
def serialize(self) -> Any:
return self._raw_value
+ def relative_to(self, parent: "UncheckedPath") -> "UncheckedPath":
+ """return a path with an added parent part"""
+ return UncheckedPath(self._raw_value, parents=(parent, *self._parents), object_path=self._object_path)
+
+ UPT = TypeVar("UPT", bound="UncheckedPath")
+
+ def reconstruct(self, cls: Type[UPT]) -> UPT:
+ """
+ Rebuild this object as an instance of its subclass. Practically, allows for conversions from
+ """
+ return cls(self._raw_value, parents=self._parents, object_path=self._object_path)
+
@classmethod
def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]:
return {
}
+class Dir(UncheckedPath):
+ """
+ Path, that is enforced to be:
+ - an existing directory
+ """
+
+ def __init__(
+ self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+ ) -> None:
+ super().__init__(source_value, parents=parents, object_path=object_path)
+ if not self._value.is_dir():
+ raise ValueError("path does not point to an existing directory")
+
+
+class AbsoluteDir(Dir):
+ """
+ Path, that is enforced to be:
+ - absolute
+ - an existing directory
+ """
+
+ def __init__(
+ self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+ ) -> None:
+ super().__init__(source_value, parents=parents, object_path=object_path)
+ if not self._value.is_absolute():
+ raise ValueError("path not absolute")
+
+
+class File(UncheckedPath):
+ """
+ Path, that is enforced to be:
+ - an existing file
+ """
+
+ def __init__(
+ self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+ ) -> None:
+ super().__init__(source_value, parents=parents, object_path=object_path)
+ if not self._value.exists():
+ raise ValueError("file does not exist")
+ if not self._value.is_file():
+ raise ValueError("path is not a file")
+
+
+class FilePath(UncheckedPath):
+ """
+ Path, that is enforced to be:
+ - parent of the last path segment is an existing directory
+ - it does not point to a dir
+ """
+
+ def __init__(
+ self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+ ) -> None:
+ super().__init__(source_value, parents=parents, object_path=object_path)
+ p = self._value.parent
+ if not p.exists() or not p.is_dir():
+ raise ValueError("path does not point inside an existing directory")
+ if self._value.is_dir():
+ raise ValueError("path points to a directory when we expected a file")
+
+
class CheckedPath(UncheckedPath):
"""
Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle,
no symlink loops. This however means, that resolving of relative path happens while validating.
"""
- def __init__(self, source_value: Any, object_path: str = "/") -> None:
- super().__init__(source_value, object_path=object_path)
+ def __init__(
+ self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+ ) -> None:
+ super().__init__(source_value, parents=parents, object_path=object_path)
try:
self._value = self._value.resolve(strict=False)
except RuntimeError as e:
from typing import Optional
-from knot_resolver_manager.datamodel.types import CheckedPath, InterfacePort
+from knot_resolver_manager.datamodel.types import File, FilePath, InterfacePort
from knot_resolver_manager.utils.modeling import BaseSchema
key_file: Path to certificate key.
"""
- unix_socket: Optional[CheckedPath] = None
+ unix_socket: Optional[FilePath] = None
interface: Optional[InterfacePort] = None
tls: bool = False
- cert_file: Optional[CheckedPath] = None
- key_file: Optional[CheckedPath] = None
+ cert_file: Optional[File] = None
+ key_file: Optional[File] = None
def _validate(self) -> None:
if bool(self.unix_socket) == bool(self.interface):
def test_tls_servers_table():
d = ForwardServerSchema(
- {"address": "2001:DB8::d0c", "hostname": "res.example.com", "ca-file": "/etc/knot-resolver/tlsca.crt"}
+ # the ca-file is a dummy, because it's existence is checked
+ {"address": "2001:DB8::d0c", "hostname": "res.example.com", "ca-file": "/etc/passwd"}
)
t = [d, ForwardServerSchema({"address": "192.0.2.1", "pin-sha256": "YQ=="})]
tmpl_str = """{% from 'macros/common_macros.lua.j2' import tls_servers_table %}
from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
-@pytest.mark.parametrize("val", [{"interface": "::1@53"}, {"unix-socket": "/path/socket"}])
+@pytest.mark.parametrize("val", [{"interface": "::1@53"}, {"unix-socket": "/tmp/socket"}])
def test_management_valid(val: Dict[str, Any]):
o = ManagementSchema(val)
if o.interface:
assert str(o.unix_socket) == val["unix-socket"]
-@pytest.mark.parametrize("val", [None, {"interface": "::1@53", "unix-socket": "/path/socket"}])
+@pytest.mark.parametrize("val", [None, {"interface": "::1@53", "unix-socket": "/tmp/socket"}])
def test_management_invalid(val: Optional[Dict[str, Any]]):
with pytest.raises(DataValidationError):
ManagementSchema(val)
from pytest import raises
from knot_resolver_manager.datamodel.network_schema import ListenSchema, NetworkSchema
-from knot_resolver_manager.datamodel.types import PortNumber
-from knot_resolver_manager.datamodel.types.types import InterfaceOptionalPort
+from knot_resolver_manager.datamodel.types import InterfaceOptionalPort, PortNumber
from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
from pytest import raises
from knot_resolver_manager.datamodel.types import (
- CheckedPath,
DomainName,
InterfaceName,
InterfaceOptionalPort,
SizeUnit,
TimeUnit,
)
+from knot_resolver_manager.datamodel.types import Dir
from knot_resolver_manager.utils.modeling import BaseSchema
def test_checked_path():
class TestSchema(BaseSchema):
- p: CheckedPath
+ p: Dir
assert str(TestSchema({"p": "/tmp"}).p) == "/tmp"