From: Aleš Date: Fri, 28 Jan 2022 15:05:06 +0000 (+0100) Subject: datamodel: types: separate directory for types X-Git-Tag: v6.0.0a1~45^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3329c7fa2986f838744c25896ae640ef2d1d30ad;p=thirdparty%2Fknot-resolver.git datamodel: types: separate directory for types - separate modules for base types and enumerations - some types/enums renamed --- diff --git a/manager/knot_resolver_manager/datamodel/forward_zone.py b/manager/knot_resolver_manager/datamodel/forward_zone.py index c2d59bd74..323bb69ec 100644 --- a/manager/knot_resolver_manager/datamodel/forward_zone.py +++ b/manager/knot_resolver_manager/datamodel/forward_zone.py @@ -1,6 +1,6 @@ from typing import List, Optional, Union -from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, FlagsEnum, IPAddressOptionalPort +from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddressOptionalPort, PolicyFlagEnum from knot_resolver_manager.utils import SchemaNode @@ -15,4 +15,4 @@ class ForwardZoneSchema(SchemaNode): tls: bool = False servers: Union[List[IPAddressOptionalPort], List[ForwardServerSchema]] views: Optional[List[str]] = None - options: Optional[List[FlagsEnum]] = None + options: Optional[List[PolicyFlagEnum]] = None diff --git a/manager/knot_resolver_manager/datamodel/policy_schema.py b/manager/knot_resolver_manager/datamodel/policy_schema.py index 8e3267f95..ce415c92b 100644 --- a/manager/knot_resolver_manager/datamodel/policy_schema.py +++ b/manager/knot_resolver_manager/datamodel/policy_schema.py @@ -1,29 +1,35 @@ from typing import List, Optional from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema -from knot_resolver_manager.datamodel.types import ActionEnum, FlagsEnum, IPAddressOptionalPort, RecordTypeEnum, TimeUnit +from knot_resolver_manager.datamodel.types import ( + DNSRecordTypeEnum, + IPAddressOptionalPort, + PolicyActionEnum, + PolicyFlagEnum, + TimeUnit, +) from knot_resolver_manager.utils import SchemaNode class FilterSchema(SchemaNode): suffix: Optional[str] = None pattern: Optional[str] = None - qtype: Optional[RecordTypeEnum] = None + qtype: Optional[DNSRecordTypeEnum] = None class AnswerSchema(SchemaNode): - qtype: RecordTypeEnum + qtype: DNSRecordTypeEnum rdata: str ttl: TimeUnit = TimeUnit("1s") nodata: bool = False class PolicySchema(SchemaNode): - action: ActionEnum + action: PolicyActionEnum order: Optional[int] = None filter: Optional[FilterSchema] = None views: Optional[List[str]] = None - options: Optional[List[FlagsEnum]] = None + options: Optional[List[PolicyFlagEnum]] = None message: Optional[str] = None reroute: Optional[List[AddressRenumberingSchema]] = None answer: Optional[AnswerSchema] = None diff --git a/manager/knot_resolver_manager/datamodel/rpz_schema.py b/manager/knot_resolver_manager/datamodel/rpz_schema.py index bd0f412f6..5d3d12451 100644 --- a/manager/knot_resolver_manager/datamodel/rpz_schema.py +++ b/manager/knot_resolver_manager/datamodel/rpz_schema.py @@ -1,15 +1,15 @@ from typing import List, Optional -from knot_resolver_manager.datamodel.types import ActionEnum, CheckedPath, FlagsEnum +from knot_resolver_manager.datamodel.types import CheckedPath, PolicyActionEnum, PolicyFlagEnum from knot_resolver_manager.utils import SchemaNode class RPZSchema(SchemaNode): - action: ActionEnum + action: PolicyActionEnum file: CheckedPath watch: bool = True views: Optional[List[str]] = None - options: Optional[List[FlagsEnum]] = None + options: Optional[List[PolicyFlagEnum]] = None message: Optional[str] = None def _validate(self) -> None: diff --git a/manager/knot_resolver_manager/datamodel/server_schema.py b/manager/knot_resolver_manager/datamodel/server_schema.py index 002adf1e2..9abdeafcc 100644 --- a/manager/knot_resolver_manager/datamodel/server_schema.py +++ b/manager/knot_resolver_manager/datamodel/server_schema.py @@ -8,10 +8,10 @@ from typing_extensions import Literal from knot_resolver_manager.datamodel.network_schema import listen_config_validate from knot_resolver_manager.datamodel.types import ( CheckedPath, + DNSRecordTypeEnum, DomainName, InterfacePort, IPAddressPort, - RecordTypeEnum, UncheckedPath, ) from knot_resolver_manager.exceptions import DataException @@ -43,7 +43,7 @@ BackendEnum = Literal["auto", "systemd", "supervisord"] class WatchDogSchema(SchemaNode): qname: DomainName - qtype: RecordTypeEnum + qtype: DNSRecordTypeEnum class ManagementSchema(SchemaNode): diff --git a/manager/knot_resolver_manager/datamodel/stub_zone_schema.py b/manager/knot_resolver_manager/datamodel/stub_zone_schema.py index d92107f1e..4dfc5fd09 100644 --- a/manager/knot_resolver_manager/datamodel/stub_zone_schema.py +++ b/manager/knot_resolver_manager/datamodel/stub_zone_schema.py @@ -1,6 +1,6 @@ from typing import List, Optional, Union -from knot_resolver_manager.datamodel.types import FlagsEnum, IPAddressOptionalPort +from knot_resolver_manager.datamodel.types import IPAddressOptionalPort, PolicyFlagEnum from knot_resolver_manager.utils import SchemaNode @@ -11,4 +11,4 @@ class StubServerSchema(SchemaNode): class StubZoneSchema(SchemaNode): servers: Union[List[IPAddressOptionalPort], List[StubServerSchema]] views: Optional[List[str]] = None - options: Optional[List[FlagsEnum]] = None + options: Optional[List[PolicyFlagEnum]] = None diff --git a/manager/knot_resolver_manager/datamodel/types/__init__.py b/manager/knot_resolver_manager/datamodel/types/__init__.py new file mode 100644 index 000000000..011a1703a --- /dev/null +++ b/manager/knot_resolver_manager/datamodel/types/__init__.py @@ -0,0 +1,41 @@ +from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum +from .types import ( + CheckedPath, + DomainName, + InterfaceName, + InterfaceOptionalPort, + InterfacePort, + IPAddress, + IPAddressOptionalPort, + IPAddressPort, + IPNetwork, + IPv4Address, + IPv6Address, + IPv6Network96, + PortNumber, + SizeUnit, + TimeUnit, + UncheckedPath, +) + +__all__ = [ + "PolicyActionEnum", + "PolicyFlagEnum", + "DNSRecordTypeEnum", + "CheckedPath", + "DomainName", + "InterfaceName", + "InterfaceOptionalPort", + "InterfacePort", + "IPAddress", + "IPAddressOptionalPort", + "IPAddressPort", + "IPNetwork", + "IPv4Address", + "IPv6Address", + "IPv6Network96", + "PortNumber", + "SizeUnit", + "TimeUnit", + "UncheckedPath", +] diff --git a/manager/knot_resolver_manager/datamodel/types/base_types.py b/manager/knot_resolver_manager/datamodel/types/base_types.py new file mode 100644 index 000000000..8615d4d29 --- /dev/null +++ b/manager/knot_resolver_manager/datamodel/types/base_types.py @@ -0,0 +1,193 @@ +import re +from typing import Any, Dict, Pattern, Type + +from knot_resolver_manager.exceptions import SchemaException +from knot_resolver_manager.utils import CustomValueType + + +class IntBase(CustomValueType): + """ + Base class to work with integer value. + """ + + _value: int + + def __int__(self) -> int: + return self._value + + def __str__(self) -> str: + return str(self._value) + + def __eq__(self, o: object) -> bool: + return isinstance(o, IntBase) and o._value == self._value + + def serialize(self) -> Any: + return self._value + + @classmethod + def json_schema(cls: Type["IntBase"]) -> Dict[Any, Any]: + return {"type": "integer"} + + +class StrBase(CustomValueType): + """ + Base class to work with string value. + """ + + _value: str + + def __int__(self) -> int: + raise ValueError("Can't convert string to an integer.") + + def __str__(self) -> str: + return self._value + + def to_std(self) -> str: + return self._value + + def __hash__(self) -> int: + return hash(self._value) + + def __eq__(self, o: object) -> bool: + return isinstance(o, StrBase) and o._value == self._value + + def serialize(self) -> Any: + return self._value + + @classmethod + def json_schema(cls: Type["StrBase"]) -> Dict[Any, Any]: + return {"type": "string"} + + +class IntRangeBase(IntBase): + """ + Base class to work with integer value in range. + Just inherit the class and set the values for '_min' and '_max'. + + class CustomIntRange(IntRangeBase): + _min: int = 0 + _max: int = 10_000 + """ + + _min: int + _max: int + + def __init__(self, source_value: Any, object_path: str = "/") -> None: + super().__init__(source_value) + if isinstance(source_value, int): + if not self._min <= source_value <= self._max: + raise SchemaException( + f"Integer value {source_value} out of range <{self._min}, {self._max}>", object_path + ) + self._value = source_value + else: + raise SchemaException( + f"Unexpected input type for integer - {type(source_value)}." + " Cause might be invalid format or invalid type.", + object_path, + ) + + @classmethod + def json_schema(cls: Type["IntRangeBase"]) -> Dict[Any, Any]: + return {"type": "integer", "minimum": 0, "maximum": 65_535} + + +class PatternBase(StrBase): + """ + Base class to work with string value that match regex pattern. + Just inherit the class and set regex pattern for '_re'. + + class ABPattern(PatternBase): + _re: Pattern[str] = re.compile(r"ab*") + """ + + _re: Pattern[str] + + def __init__(self, source_value: Any, object_path: str = "/") -> None: + super().__init__(source_value) + if isinstance(source_value, str): + if type(self)._re.match(source_value): + self._value: str = source_value + else: + raise SchemaException(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path) + else: + raise SchemaException( + f"Unexpected input type for string pattern - {type(source_value)}." + " Cause might be invalid format or invalid type.", + object_path, + ) + + @classmethod + def json_schema(cls: Type["PatternBase"]) -> Dict[Any, Any]: + return {"type": "string", "pattern": rf"{cls._re.pattern}"} + + +class UnitBase(IntBase): + """ + Base class to work with string value that match regex pattern. + Just inherit the class and set '_units'. + + class CustomUnit(PatternBase): + _units = {"b": 1, "kb": 1000} + """ + + _re: Pattern[str] + _units: Dict[str, int] + _value_orig: str + + def __init__(self, source_value: Any, object_path: str = "/") -> None: + super().__init__(source_value) + type(self)._re = re.compile(rf"^(\d+)({r'|'.join(type(self)._units.keys())})$") + if isinstance(source_value, str) and self._re.match(source_value): + self._value_orig = source_value + grouped = self._re.search(source_value) + if grouped: + val, unit = grouped.groups() + if unit is None: + raise SchemaException( + f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path + ) + elif unit not in type(self)._units: + raise SchemaException( + f"Used unexpected unit '{unit}' for {type(self).__name__}." + f" Accepted units are {list(type(self)._units.keys())}", + object_path, + ) + self._value = int(val) * type(self)._units[unit] + else: + raise SchemaException(f"{type(self._value)} Failed to convert: {self}", object_path) + elif isinstance(source_value, int): + raise SchemaException( + "We do not accept number without units." + f" Please convert the value to string an add a unit - {list(type(self)._units.keys())}", + object_path, + ) + else: + raise SchemaException( + f"Unexpected input type for Unit type - {type(source_value)}." + " Cause might be invalid format or invalid type.", + object_path, + ) + + def __str__(self) -> str: + """ + Used by Jinja2. Must return only a number. + """ + return str(self._value_orig) + + def __repr__(self) -> str: + return f"Unit[{type(self).__name__},{self._value_orig}]" + + def __eq__(self, o: object) -> bool: + """ + Two instances are equal when they represent the same size + regardless of their string representation. + """ + return isinstance(o, UnitBase) and o._value == self._value + + def serialize(self) -> Any: + return self._value_orig + + @classmethod + def json_schema(cls: Type["UnitBase"]) -> Dict[Any, Any]: + return {"type": "string", "pattern": rf"{cls._re.pattern}"} diff --git a/manager/knot_resolver_manager/datamodel/types/enums.py b/manager/knot_resolver_manager/datamodel/types/enums.py new file mode 100644 index 000000000..6918f8b1a --- /dev/null +++ b/manager/knot_resolver_manager/datamodel/types/enums.py @@ -0,0 +1,151 @@ +from typing_extensions import Literal + +# Policy actions +PolicyActionEnum = Literal[ + # Nonchain actions + "pass", + "deny", + "drop", + "refuse", + "tc", + "reroute", + "answer", + # Chain actions + "mirror", + "debug-always", + "debug-cache-miss", + "qtrace", + "reqtrace", +] + +# FLAGS from https://knot-resolver.readthedocs.io/en/stable/lib.html?highlight=options#c.kr_qflags +PolicyFlagEnum = Literal[ + "no-minimize", + "no-ipv4", + "no-ipv6", + "tcp", + "resolved", + "await-ipv4", + "await-ipv6", + "await-cut", + "no-edns", + "cached", + "no-cache", + "expiring", + "allow_local", + "dnssec-want", + "dnssec-bogus", + "dnssec-insecure", + "dnssec-cd", + "stub", + "always-cut", + "dnssec-wexpand", + "permissive", + "strict", + "badcookie-again", + "cname", + "reorder-rr", + "trace", + "no-0x20", + "dnssec-nods", + "dnssec-optout", + "nonauth", + "forward", + "dns64-mark", + "cache-tried", + "no-ns-found", + "pkt-is-sane", + "dns64-disable", +] + +# DNS records from 'kres.type' table +DNSRecordTypeEnum = Literal[ + "A", + "A6", + "AAAA", + "AFSDB", + "ANY", + "APL", + "ATMA", + "AVC", + "AXFR", + "CAA", + "CDNSKEY", + "CDS", + "CERT", + "CNAME", + "CSYNC", + "DHCID", + "DLV", + "DNAME", + "DNSKEY", + "DOA", + "DS", + "EID", + "EUI48", + "EUI64", + "GID", + "GPOS", + "HINFO", + "HIP", + "HTTPS", + "IPSECKEY", + "ISDN", + "IXFR", + "KEY", + "KX", + "L32", + "L64", + "LOC", + "LP", + "MAILA", + "MAILB", + "MB", + "MD", + "MF", + "MG", + "MINFO", + "MR", + "MX", + "NAPTR", + "NID", + "NIMLOC", + "NINFO", + "NS", + "NSAP", + "NSAP-PTR", + "NSEC", + "NSEC3", + "NSEC3PARAM", + "NULL", + "NXT", + "OPENPGPKEY", + "OPT", + "PTR", + "PX", + "RKEY", + "RP", + "RRSIG", + "RT", + "SIG", + "SINK", + "SMIMEA", + "SOA", + "SPF", + "SRV", + "SSHFP", + "SVCB", + "TA", + "TALINK", + "TKEY", + "TLSA", + "TSIG", + "TXT", + "UID", + "UINFO", + "UNSPEC", + "URI", + "WKS", + "X25", + "ZONEMD", +] diff --git a/manager/knot_resolver_manager/datamodel/types.py b/manager/knot_resolver_manager/datamodel/types/types.py similarity index 59% rename from manager/knot_resolver_manager/datamodel/types.py rename to manager/knot_resolver_manager/datamodel/types/types.py index 99fdee4f8..d4fe7f304 100644 --- a/manager/knot_resolver_manager/datamodel/types.py +++ b/manager/knot_resolver_manager/datamodel/types/types.py @@ -1,285 +1,14 @@ import ipaddress -import logging import re from pathlib import Path -from typing import Any, Dict, Optional, Pattern, Type, Union - -from typing_extensions import Literal +from typing import Any, Dict, Optional, Type, Union +from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase from knot_resolver_manager.exceptions import SchemaException from knot_resolver_manager.utils import CustomValueType -logger = logging.getLogger(__name__) - -# Policy actions -ActionEnum = Literal[ - # Nonchain actions - "pass", - "deny", - "drop", - "refuse", - "tc", - "reroute", - "answer", - # Chain actions - "mirror", - "debug-always", - "debug-cache-miss", - "qtrace", - "reqtrace", -] - -# FLAGS from https://knot-resolver.readthedocs.io/en/stable/lib.html?highlight=options#c.kr_qflags -FlagsEnum = Literal[ - "no-minimize", - "no-ipv4", - "no-ipv6", - "tcp", - "resolved", - "await-ipv4", - "await-ipv6", - "await-cut", - "no-edns", - "cached", - "no-cache", - "expiring", - "allow_local", - "dnssec-want", - "dnssec-bogus", - "dnssec-insecure", - "dnssec-cd", - "stub", - "always-cut", - "dnssec-wexpand", - "permissive", - "strict", - "badcookie-again", - "cname", - "reorder-rr", - "trace", - "no-0x20", - "dnssec-nods", - "dnssec-optout", - "nonauth", - "forward", - "dns64-mark", - "cache-tried", - "no-ns-found", - "pkt-is-sane", - "dns64-disable", -] - -# DNS record types from 'kres.type' table -RecordTypeEnum = Literal[ - "A", - "A6", - "AAAA", - "AFSDB", - "ANY", - "APL", - "ATMA", - "AVC", - "AXFR", - "CAA", - "CDNSKEY", - "CDS", - "CERT", - "CNAME", - "CSYNC", - "DHCID", - "DLV", - "DNAME", - "DNSKEY", - "DOA", - "DS", - "EID", - "EUI48", - "EUI64", - "GID", - "GPOS", - "HINFO", - "HIP", - "HTTPS", - "IPSECKEY", - "ISDN", - "IXFR", - "KEY", - "KX", - "L32", - "L64", - "LOC", - "LP", - "MAILA", - "MAILB", - "MB", - "MD", - "MF", - "MG", - "MINFO", - "MR", - "MX", - "NAPTR", - "NID", - "NIMLOC", - "NINFO", - "NS", - "NSAP", - "NSAP-PTR", - "NSEC", - "NSEC3", - "NSEC3PARAM", - "NULL", - "NXT", - "OPENPGPKEY", - "OPT", - "PTR", - "PX", - "RKEY", - "RP", - "RRSIG", - "RT", - "SIG", - "SINK", - "SMIMEA", - "SOA", - "SPF", - "SRV", - "SSHFP", - "SVCB", - "TA", - "TALINK", - "TKEY", - "TLSA", - "TSIG", - "TXT", - "UID", - "UINFO", - "UNSPEC", - "URI", - "WKS", - "X25", - "ZONEMD", -] - - -class _IntCustomBase(CustomValueType): - """ - Base class to work with integer value that is intended as a basis for other custom types. - """ - - _value: int - - def __int__(self) -> int: - return self._value - - def __str__(self) -> str: - return str(self._value) - - def __eq__(self, o: object) -> bool: - return isinstance(o, _IntCustomBase) and o._value == self._value - - def serialize(self) -> Any: - return self._value - - @classmethod - def json_schema(cls: Type["_IntCustomBase"]) -> Dict[Any, Any]: - return {"type": "integer"} - - -class _StrCustomBase(CustomValueType): - """ - Base class to work with string value that is intended as a basis for other custom types. - """ - - _value: str - - def __int__(self) -> int: - raise ValueError("Can't convert string to an integer.") - - def __str__(self) -> str: - return self._value - - def to_std(self) -> str: - return self._value - - def __hash__(self) -> int: - return hash(self._value) - - def __eq__(self, o: object) -> bool: - return isinstance(o, _StrCustomBase) and o._value == self._value - - def serialize(self) -> Any: - return self._value - - @classmethod - def json_schema(cls: Type["_StrCustomBase"]) -> Dict[Any, Any]: - return {"type": "string"} - - -class _IntRangeBase(_IntCustomBase): - """ - Base class to work with integer value ranges that is intended as a basis for other custom types. - Just inherit the class and set the values for _min and _max. - - class CustomIntRange(_IntRangeBase): - _min: int = 0 - _max: int = 10_000 - """ - - _min: int - _max: int - - def __init__(self, source_value: Any, object_path: str = "/") -> None: - super().__init__(source_value) - if isinstance(source_value, int): - if not self._min <= source_value <= self._max: - raise SchemaException( - f"Integer value {source_value} out of range <{self._min}, {self._max}>", object_path - ) - self._value = source_value - else: - raise SchemaException( - f"Unexpected input type for integer - {type(source_value)}." - " Cause might be invalid format or invalid type.", - object_path, - ) - - @classmethod - def json_schema(cls: Type["_IntRangeBase"]) -> Dict[Any, Any]: - return {"type": "integer", "minimum": cls._min, "maximum": cls._max} - - -class _PatternBase(_StrCustomBase): - """ - Base class to work with string value that match regex pattern. - Just inherit the class and set pattern for _re. - - class ABPattern(_PatternBase): - _re: Pattern[str] = re.compile(r"ab*") - """ - - _re: Pattern[str] - def __init__(self, source_value: Any, object_path: str = "/") -> None: - super().__init__(source_value) - if isinstance(source_value, str): - if type(self)._re.match(source_value): - self._value: str = source_value - else: - raise SchemaException(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path) - else: - raise SchemaException( - f"Unexpected input type for string pattern - {type(source_value)}." - " Cause might be invalid format or invalid type.", - object_path, - ) - - @classmethod - def json_schema(cls: Type["_PatternBase"]) -> Dict[Any, Any]: - return {"type": "string", "pattern": rf"{cls._re.pattern}"} - - -class PortNumber(_IntRangeBase): +class PortNumber(IntRangeBase): _min: int = 1 _max: int = 65_535 @@ -291,82 +20,14 @@ class PortNumber(_IntRangeBase): raise SchemaException(f"Invalid port number {port}", object_path) from e -class Unit(CustomValueType): - _re: Pattern[str] - _units: Dict[str, int] - - def __init__(self, source_value: Any, object_path: str = "/") -> None: - super().__init__(source_value) - self._value: int - self._value_orig: Union[str, int] - if isinstance(source_value, str) and type(self)._re.match(source_value): - self._value_orig = source_value - grouped = type(self)._re.search(source_value) - if grouped: - val, unit = grouped.groups() - if unit is None: - raise SchemaException( - f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path - ) - elif unit not in type(self)._units: - raise SchemaException( - f"Used unexpected unit '{unit}' for {type(self).__name__}." - f" Accepted units are {list(type(self)._units.keys())}", - object_path, - ) - self._value = int(val) * type(self)._units[unit] - else: - raise SchemaException(f"{type(self._value)} Failed to convert: {self}", object_path) - elif isinstance(source_value, int): - raise SchemaException( - "We do not accept number without units." - f" Please convert the value to string an add a unit - {list(type(self)._units.keys())}", - object_path, - ) - else: - raise SchemaException( - f"Unexpected input type for Unit type - {type(source_value)}." - " Cause might be invalid format or invalid type.", - object_path, - ) - - def __int__(self) -> int: - return self._value - - def __str__(self) -> str: - """ - Used by Jinja2. Must return only a number. - """ - return str(self._value_orig) - - def __repr__(self) -> str: - return f"Unit[{type(self).__name__},{self._value_orig}]" - - def __eq__(self, o: object) -> bool: - """ - Two instances are equal when they represent the same size - regardless of their string representation. - """ - return isinstance(o, Unit) and o._value == self._value - - def serialize(self) -> Any: - return self._value_orig - - @classmethod - def json_schema(cls: Type["Unit"]) -> Dict[Any, Any]: - return {"type": "string", "pattern": r"\d+(" + "|".join(cls._units.keys()) + ")"} - - -class SizeUnit(Unit): - _re = re.compile(r"^([0-9]+)\s{0,1}([BKMG]){0,1}$") +class SizeUnit(UnitBase): _units = {"B": 1, "K": 1024, "M": 1024 ** 2, "G": 1024 ** 3} def bytes(self) -> int: return self._value -class TimeUnit(Unit): - _re = re.compile(r"^(\d+)\s{0,1}([smhd]s?){0,1}$") +class TimeUnit(UnitBase): _units = {"ms": 1, "s": 1000, "m": 60 * 1000, "h": 3600 * 1000, "d": 24 * 3600 * 1000} def seconds(self) -> int: @@ -376,61 +37,7 @@ class TimeUnit(Unit): return self._value -class UncheckedPath(CustomValueType): - """ - Wrapper around pathlib.Path object. Can represent pretty much any Path. No checks are - performed on the value. The value is taken as is. - """ - - def __init__(self, source_value: Any, object_path: str = "/") -> None: - super().__init__(source_value, object_path=object_path) - if isinstance(source_value, str): - self._value: Path = Path(source_value) - else: - raise SchemaException( - f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'.", object_path - ) - - def __str__(self) -> str: - return str(self._value) - - def __eq__(self, o: object) -> bool: - if not isinstance(o, UncheckedPath): - return False - - return o._value == self._value - - def __int__(self) -> int: - raise RuntimeError("Path cannot be converted to type ") - - def to_path(self) -> Path: - return self._value - - def serialize(self) -> Any: - return str(self._value) - - @classmethod - def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]: - return { - "type": "string", - } - - -class CheckedPath(UncheckedPath): - """ - Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle, - no symlink loops. This however means, that resolving of relative path happens while validating. - """ - - def __init__(self, source_value: Any, object_path: str = "/") -> None: - super().__init__(source_value, object_path=object_path) - try: - self._value = self._value.resolve(strict=False) - except RuntimeError as e: - raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e - - -class DomainName(_PatternBase): +class DomainName(PatternBase): _re = re.compile( r"^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|" r"([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|" @@ -439,11 +46,11 @@ class DomainName(_PatternBase): ) -class InterfaceName(_PatternBase): +class InterfaceName(PatternBase): _re = re.compile(r"^[a-zA-Z0-9]+(?:[-_][a-zA-Z0-9]+)*$") -class InterfacePort(_StrCustomBase): +class InterfacePort(StrBase): if_name: InterfaceName port: PortNumber @@ -465,7 +72,7 @@ class InterfacePort(_StrCustomBase): ) -class InterfaceOptionalPort(_StrCustomBase): +class InterfaceOptionalPort(StrBase): if_name: InterfaceName port: Optional[PortNumber] = None @@ -488,7 +95,7 @@ class InterfaceOptionalPort(_StrCustomBase): ) -class IPAddressPort(_StrCustomBase): +class IPAddressPort(StrBase): addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address] port: PortNumber @@ -513,7 +120,7 @@ class IPAddressPort(_StrCustomBase): ) -class IPAddressOptionalPort(_StrCustomBase): +class IPAddressOptionalPort(StrBase): addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address] port: Optional[PortNumber] = None @@ -712,3 +319,59 @@ class IPv6Network96(CustomValueType): @classmethod def json_schema(cls: Type["IPv6Network96"]) -> Dict[Any, Any]: return {"type": "string"} + + +class UncheckedPath(CustomValueType): + """ + Wrapper around pathlib.Path object. Can represent pretty much any Path. No checks are + performed on the value. The value is taken as is. + """ + + _value: Path + + def __init__(self, source_value: Any, object_path: str = "/") -> None: + super().__init__(source_value, object_path=object_path) + if isinstance(source_value, str): + self._value: Path = Path(source_value) + else: + raise SchemaException( + f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'.", object_path + ) + + def __str__(self) -> str: + return str(self._value) + + def __eq__(self, o: object) -> bool: + if not isinstance(o, UncheckedPath): + return False + + return o._value == self._value + + def __int__(self) -> int: + raise RuntimeError("Path cannot be converted to type ") + + def to_path(self) -> Path: + return self._value + + def serialize(self) -> Any: + return str(self._value) + + @classmethod + def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]: + return { + "type": "string", + } + + +class CheckedPath(UncheckedPath): + """ + Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle, + no symlink loops. This however means, that resolving of relative path happens while validating. + """ + + def __init__(self, source_value: Any, object_path: str = "/") -> None: + super().__init__(source_value, object_path=object_path) + try: + self._value = self._value.resolve(strict=False) + except RuntimeError as e: + raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e diff --git a/manager/knot_resolver_manager/datamodel/view_schema.py b/manager/knot_resolver_manager/datamodel/view_schema.py index 150fe6ea6..65e757f09 100644 --- a/manager/knot_resolver_manager/datamodel/view_schema.py +++ b/manager/knot_resolver_manager/datamodel/view_schema.py @@ -1,6 +1,6 @@ from typing import List, Optional -from knot_resolver_manager.datamodel.types import FlagsEnum, IPNetwork +from knot_resolver_manager.datamodel.types import IPNetwork, PolicyFlagEnum from knot_resolver_manager.utils import SchemaNode @@ -16,7 +16,7 @@ class ViewSchema(SchemaNode): subnets: Optional[List[IPNetwork]] = None tsig: Optional[List[str]] = None - options: Optional[List[FlagsEnum]] = None + options: Optional[List[PolicyFlagEnum]] = None def _validate(self) -> None: if self.tsig is None and self.subnets is None: diff --git a/manager/setup.py b/manager/setup.py index 6f9c864bb..8f3e04f1a 100644 --- a/manager/setup.py +++ b/manager/setup.py @@ -6,6 +6,7 @@ packages = \ 'knot_resolver_manager.client', 'knot_resolver_manager.compat', 'knot_resolver_manager.datamodel', + 'knot_resolver_manager.datamodel.types', 'knot_resolver_manager.kresd_controller', 'knot_resolver_manager.kresd_controller.supervisord', 'knot_resolver_manager.kresd_controller.systemd', diff --git a/manager/tests/unit/datamodel/templates/test_policy_macros.py b/manager/tests/unit/datamodel/templates/test_policy_macros.py index cc8b941dd..129055a8b 100644 --- a/manager/tests/unit/datamodel/templates/test_policy_macros.py +++ b/manager/tests/unit/datamodel/templates/test_policy_macros.py @@ -3,7 +3,7 @@ from typing import List from knot_resolver_manager.datamodel.config_schema import template_from_str from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema from knot_resolver_manager.datamodel.policy_schema import AnswerSchema -from knot_resolver_manager.datamodel.types import FlagsEnum +from knot_resolver_manager.datamodel.types import PolicyFlagEnum def test_policy_add(): @@ -17,7 +17,7 @@ def test_policy_add(): def test_policy_flags(): - flags: List[FlagsEnum] = ["no-cache", "no-edns"] + flags: List[PolicyFlagEnum] = ["no-cache", "no-edns"] tmpl_str = """{% from 'macros/policy_macros.lua.j2' import policy_flags %} {{ policy_flags(flags) }}"""