]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
datamodel: types: separate directory for types
authorAleš <ales.mrazek@nic.cz>
Fri, 28 Jan 2022 15:05:06 +0000 (16:05 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:53 +0000 (16:17 +0200)
- separate modules for base types and enumerations
- some  types/enums renamed

12 files changed:
manager/knot_resolver_manager/datamodel/forward_zone.py
manager/knot_resolver_manager/datamodel/policy_schema.py
manager/knot_resolver_manager/datamodel/rpz_schema.py
manager/knot_resolver_manager/datamodel/server_schema.py
manager/knot_resolver_manager/datamodel/stub_zone_schema.py
manager/knot_resolver_manager/datamodel/types/__init__.py [new file with mode: 0644]
manager/knot_resolver_manager/datamodel/types/base_types.py [new file with mode: 0644]
manager/knot_resolver_manager/datamodel/types/enums.py [new file with mode: 0644]
manager/knot_resolver_manager/datamodel/types/types.py [moved from manager/knot_resolver_manager/datamodel/types.py with 59% similarity]
manager/knot_resolver_manager/datamodel/view_schema.py
manager/setup.py
manager/tests/unit/datamodel/templates/test_policy_macros.py

index c2d59bd747dc9f7e3ce6cc53543f4511a6aad8e3..323bb69ec0ca8b967ffaab3d81c57c50d2eed910 100644 (file)
@@ -1,6 +1,6 @@
 from typing import List, Optional, Union
 
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, FlagsEnum, IPAddressOptionalPort
+from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddressOptionalPort, PolicyFlagEnum
 from knot_resolver_manager.utils import SchemaNode
 
 
@@ -15,4 +15,4 @@ class ForwardZoneSchema(SchemaNode):
     tls: bool = False
     servers: Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]
     views: Optional[List[str]] = None
-    options: Optional[List[FlagsEnum]] = None
+    options: Optional[List[PolicyFlagEnum]] = None
index 8e3267f95b54eea542f4b361868bb274d8601e57..ce415c92b5f14a09ef26b87d50c5dccb703befb5 100644 (file)
@@ -1,29 +1,35 @@
 from typing import List, Optional
 
 from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
-from knot_resolver_manager.datamodel.types import ActionEnum, FlagsEnum, IPAddressOptionalPort, RecordTypeEnum, TimeUnit
+from knot_resolver_manager.datamodel.types import (
+    DNSRecordTypeEnum,
+    IPAddressOptionalPort,
+    PolicyActionEnum,
+    PolicyFlagEnum,
+    TimeUnit,
+)
 from knot_resolver_manager.utils import SchemaNode
 
 
 class FilterSchema(SchemaNode):
     suffix: Optional[str] = None
     pattern: Optional[str] = None
-    qtype: Optional[RecordTypeEnum] = None
+    qtype: Optional[DNSRecordTypeEnum] = None
 
 
 class AnswerSchema(SchemaNode):
-    qtype: RecordTypeEnum
+    qtype: DNSRecordTypeEnum
     rdata: str
     ttl: TimeUnit = TimeUnit("1s")
     nodata: bool = False
 
 
 class PolicySchema(SchemaNode):
-    action: ActionEnum
+    action: PolicyActionEnum
     order: Optional[int] = None
     filter: Optional[FilterSchema] = None
     views: Optional[List[str]] = None
-    options: Optional[List[FlagsEnum]] = None
+    options: Optional[List[PolicyFlagEnum]] = None
     message: Optional[str] = None
     reroute: Optional[List[AddressRenumberingSchema]] = None
     answer: Optional[AnswerSchema] = None
index bd0f412f6947acda6e195f860b17a1182e3b9906..5d3d12451687efc96f73646f18539b0af75bccb7 100644 (file)
@@ -1,15 +1,15 @@
 from typing import List, Optional
 
-from knot_resolver_manager.datamodel.types import ActionEnum, CheckedPath, FlagsEnum
+from knot_resolver_manager.datamodel.types import CheckedPath, PolicyActionEnum, PolicyFlagEnum
 from knot_resolver_manager.utils import SchemaNode
 
 
 class RPZSchema(SchemaNode):
-    action: ActionEnum
+    action: PolicyActionEnum
     file: CheckedPath
     watch: bool = True
     views: Optional[List[str]] = None
-    options: Optional[List[FlagsEnum]] = None
+    options: Optional[List[PolicyFlagEnum]] = None
     message: Optional[str] = None
 
     def _validate(self) -> None:
index 002adf1e29a2039144b01b90cfcabc0c3a9e8dae..9abdeafccd94797dd7f9b6ac8394a3c320337831 100644 (file)
@@ -8,10 +8,10 @@ from typing_extensions import Literal
 from knot_resolver_manager.datamodel.network_schema import listen_config_validate
 from knot_resolver_manager.datamodel.types import (
     CheckedPath,
+    DNSRecordTypeEnum,
     DomainName,
     InterfacePort,
     IPAddressPort,
-    RecordTypeEnum,
     UncheckedPath,
 )
 from knot_resolver_manager.exceptions import DataException
@@ -43,7 +43,7 @@ BackendEnum = Literal["auto", "systemd", "supervisord"]
 
 class WatchDogSchema(SchemaNode):
     qname: DomainName
-    qtype: RecordTypeEnum
+    qtype: DNSRecordTypeEnum
 
 
 class ManagementSchema(SchemaNode):
index d92107f1eaffaad1c4458cf84dd570c1f40e9c5f..4dfc5fd09008e2d1420657135ed3a4f1a09666df 100644 (file)
@@ -1,6 +1,6 @@
 from typing import List, Optional, Union
 
-from knot_resolver_manager.datamodel.types import FlagsEnum, IPAddressOptionalPort
+from knot_resolver_manager.datamodel.types import IPAddressOptionalPort, PolicyFlagEnum
 from knot_resolver_manager.utils import SchemaNode
 
 
@@ -11,4 +11,4 @@ class StubServerSchema(SchemaNode):
 class StubZoneSchema(SchemaNode):
     servers: Union[List[IPAddressOptionalPort], List[StubServerSchema]]
     views: Optional[List[str]] = None
-    options: Optional[List[FlagsEnum]] = None
+    options: Optional[List[PolicyFlagEnum]] = None
diff --git a/manager/knot_resolver_manager/datamodel/types/__init__.py b/manager/knot_resolver_manager/datamodel/types/__init__.py
new file mode 100644 (file)
index 0000000..011a170
--- /dev/null
@@ -0,0 +1,41 @@
+from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum
+from .types import (
+    CheckedPath,
+    DomainName,
+    InterfaceName,
+    InterfaceOptionalPort,
+    InterfacePort,
+    IPAddress,
+    IPAddressOptionalPort,
+    IPAddressPort,
+    IPNetwork,
+    IPv4Address,
+    IPv6Address,
+    IPv6Network96,
+    PortNumber,
+    SizeUnit,
+    TimeUnit,
+    UncheckedPath,
+)
+
+__all__ = [
+    "PolicyActionEnum",
+    "PolicyFlagEnum",
+    "DNSRecordTypeEnum",
+    "CheckedPath",
+    "DomainName",
+    "InterfaceName",
+    "InterfaceOptionalPort",
+    "InterfacePort",
+    "IPAddress",
+    "IPAddressOptionalPort",
+    "IPAddressPort",
+    "IPNetwork",
+    "IPv4Address",
+    "IPv6Address",
+    "IPv6Network96",
+    "PortNumber",
+    "SizeUnit",
+    "TimeUnit",
+    "UncheckedPath",
+]
diff --git a/manager/knot_resolver_manager/datamodel/types/base_types.py b/manager/knot_resolver_manager/datamodel/types/base_types.py
new file mode 100644 (file)
index 0000000..8615d4d
--- /dev/null
@@ -0,0 +1,193 @@
+import re
+from typing import Any, Dict, Pattern, Type
+
+from knot_resolver_manager.exceptions import SchemaException
+from knot_resolver_manager.utils import CustomValueType
+
+
+class IntBase(CustomValueType):
+    """
+    Base class to work with integer value.
+    """
+
+    _value: int
+
+    def __int__(self) -> int:
+        return self._value
+
+    def __str__(self) -> str:
+        return str(self._value)
+
+    def __eq__(self, o: object) -> bool:
+        return isinstance(o, IntBase) and o._value == self._value
+
+    def serialize(self) -> Any:
+        return self._value
+
+    @classmethod
+    def json_schema(cls: Type["IntBase"]) -> Dict[Any, Any]:
+        return {"type": "integer"}
+
+
+class StrBase(CustomValueType):
+    """
+    Base class to work with string value.
+    """
+
+    _value: str
+
+    def __int__(self) -> int:
+        raise ValueError("Can't convert string to an integer.")
+
+    def __str__(self) -> str:
+        return self._value
+
+    def to_std(self) -> str:
+        return self._value
+
+    def __hash__(self) -> int:
+        return hash(self._value)
+
+    def __eq__(self, o: object) -> bool:
+        return isinstance(o, StrBase) and o._value == self._value
+
+    def serialize(self) -> Any:
+        return self._value
+
+    @classmethod
+    def json_schema(cls: Type["StrBase"]) -> Dict[Any, Any]:
+        return {"type": "string"}
+
+
+class IntRangeBase(IntBase):
+    """
+    Base class to work with integer value in range.
+    Just inherit the class and set the values for '_min' and '_max'.
+
+    class CustomIntRange(IntRangeBase):
+        _min: int = 0
+        _max: int = 10_000
+    """
+
+    _min: int
+    _max: int
+
+    def __init__(self, source_value: Any, object_path: str = "/") -> None:
+        super().__init__(source_value)
+        if isinstance(source_value, int):
+            if not self._min <= source_value <= self._max:
+                raise SchemaException(
+                    f"Integer value {source_value} out of range <{self._min}, {self._max}>", object_path
+                )
+            self._value = source_value
+        else:
+            raise SchemaException(
+                f"Unexpected input type for integer - {type(source_value)}."
+                " Cause might be invalid format or invalid type.",
+                object_path,
+            )
+
+    @classmethod
+    def json_schema(cls: Type["IntRangeBase"]) -> Dict[Any, Any]:
+        return {"type": "integer", "minimum": 0, "maximum": 65_535}
+
+
+class PatternBase(StrBase):
+    """
+    Base class to work with string value that match regex pattern.
+    Just inherit the class and set regex pattern for '_re'.
+
+    class ABPattern(PatternBase):
+        _re: Pattern[str] = re.compile(r"ab*")
+    """
+
+    _re: Pattern[str]
+
+    def __init__(self, source_value: Any, object_path: str = "/") -> None:
+        super().__init__(source_value)
+        if isinstance(source_value, str):
+            if type(self)._re.match(source_value):
+                self._value: str = source_value
+            else:
+                raise SchemaException(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path)
+        else:
+            raise SchemaException(
+                f"Unexpected input type for string pattern - {type(source_value)}."
+                " Cause might be invalid format or invalid type.",
+                object_path,
+            )
+
+    @classmethod
+    def json_schema(cls: Type["PatternBase"]) -> Dict[Any, Any]:
+        return {"type": "string", "pattern": rf"{cls._re.pattern}"}
+
+
+class UnitBase(IntBase):
+    """
+    Base class to work with string value that match regex pattern.
+    Just inherit the class and set '_units'.
+
+    class CustomUnit(PatternBase):
+        _units = {"b": 1, "kb": 1000}
+    """
+
+    _re: Pattern[str]
+    _units: Dict[str, int]
+    _value_orig: str
+
+    def __init__(self, source_value: Any, object_path: str = "/") -> None:
+        super().__init__(source_value)
+        type(self)._re = re.compile(rf"^(\d+)({r'|'.join(type(self)._units.keys())})$")
+        if isinstance(source_value, str) and self._re.match(source_value):
+            self._value_orig = source_value
+            grouped = self._re.search(source_value)
+            if grouped:
+                val, unit = grouped.groups()
+                if unit is None:
+                    raise SchemaException(
+                        f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path
+                    )
+                elif unit not in type(self)._units:
+                    raise SchemaException(
+                        f"Used unexpected unit '{unit}' for {type(self).__name__}."
+                        f" Accepted units are {list(type(self)._units.keys())}",
+                        object_path,
+                    )
+                self._value = int(val) * type(self)._units[unit]
+            else:
+                raise SchemaException(f"{type(self._value)} Failed to convert: {self}", object_path)
+        elif isinstance(source_value, int):
+            raise SchemaException(
+                "We do not accept number without units."
+                f" Please convert the value to string an add a unit - {list(type(self)._units.keys())}",
+                object_path,
+            )
+        else:
+            raise SchemaException(
+                f"Unexpected input type for Unit type - {type(source_value)}."
+                " Cause might be invalid format or invalid type.",
+                object_path,
+            )
+
+    def __str__(self) -> str:
+        """
+        Used by Jinja2. Must return only a number.
+        """
+        return str(self._value_orig)
+
+    def __repr__(self) -> str:
+        return f"Unit[{type(self).__name__},{self._value_orig}]"
+
+    def __eq__(self, o: object) -> bool:
+        """
+        Two instances are equal when they represent the same size
+        regardless of their string representation.
+        """
+        return isinstance(o, UnitBase) and o._value == self._value
+
+    def serialize(self) -> Any:
+        return self._value_orig
+
+    @classmethod
+    def json_schema(cls: Type["UnitBase"]) -> Dict[Any, Any]:
+        return {"type": "string", "pattern": rf"{cls._re.pattern}"}
diff --git a/manager/knot_resolver_manager/datamodel/types/enums.py b/manager/knot_resolver_manager/datamodel/types/enums.py
new file mode 100644 (file)
index 0000000..6918f8b
--- /dev/null
@@ -0,0 +1,151 @@
+from typing_extensions import Literal
+
+# Policy actions
+PolicyActionEnum = Literal[
+    # Nonchain actions
+    "pass",
+    "deny",
+    "drop",
+    "refuse",
+    "tc",
+    "reroute",
+    "answer",
+    # Chain actions
+    "mirror",
+    "debug-always",
+    "debug-cache-miss",
+    "qtrace",
+    "reqtrace",
+]
+
+# FLAGS from https://knot-resolver.readthedocs.io/en/stable/lib.html?highlight=options#c.kr_qflags
+PolicyFlagEnum = Literal[
+    "no-minimize",
+    "no-ipv4",
+    "no-ipv6",
+    "tcp",
+    "resolved",
+    "await-ipv4",
+    "await-ipv6",
+    "await-cut",
+    "no-edns",
+    "cached",
+    "no-cache",
+    "expiring",
+    "allow_local",
+    "dnssec-want",
+    "dnssec-bogus",
+    "dnssec-insecure",
+    "dnssec-cd",
+    "stub",
+    "always-cut",
+    "dnssec-wexpand",
+    "permissive",
+    "strict",
+    "badcookie-again",
+    "cname",
+    "reorder-rr",
+    "trace",
+    "no-0x20",
+    "dnssec-nods",
+    "dnssec-optout",
+    "nonauth",
+    "forward",
+    "dns64-mark",
+    "cache-tried",
+    "no-ns-found",
+    "pkt-is-sane",
+    "dns64-disable",
+]
+
+# DNS records from 'kres.type' table
+DNSRecordTypeEnum = Literal[
+    "A",
+    "A6",
+    "AAAA",
+    "AFSDB",
+    "ANY",
+    "APL",
+    "ATMA",
+    "AVC",
+    "AXFR",
+    "CAA",
+    "CDNSKEY",
+    "CDS",
+    "CERT",
+    "CNAME",
+    "CSYNC",
+    "DHCID",
+    "DLV",
+    "DNAME",
+    "DNSKEY",
+    "DOA",
+    "DS",
+    "EID",
+    "EUI48",
+    "EUI64",
+    "GID",
+    "GPOS",
+    "HINFO",
+    "HIP",
+    "HTTPS",
+    "IPSECKEY",
+    "ISDN",
+    "IXFR",
+    "KEY",
+    "KX",
+    "L32",
+    "L64",
+    "LOC",
+    "LP",
+    "MAILA",
+    "MAILB",
+    "MB",
+    "MD",
+    "MF",
+    "MG",
+    "MINFO",
+    "MR",
+    "MX",
+    "NAPTR",
+    "NID",
+    "NIMLOC",
+    "NINFO",
+    "NS",
+    "NSAP",
+    "NSAP-PTR",
+    "NSEC",
+    "NSEC3",
+    "NSEC3PARAM",
+    "NULL",
+    "NXT",
+    "OPENPGPKEY",
+    "OPT",
+    "PTR",
+    "PX",
+    "RKEY",
+    "RP",
+    "RRSIG",
+    "RT",
+    "SIG",
+    "SINK",
+    "SMIMEA",
+    "SOA",
+    "SPF",
+    "SRV",
+    "SSHFP",
+    "SVCB",
+    "TA",
+    "TALINK",
+    "TKEY",
+    "TLSA",
+    "TSIG",
+    "TXT",
+    "UID",
+    "UINFO",
+    "UNSPEC",
+    "URI",
+    "WKS",
+    "X25",
+    "ZONEMD",
+]
similarity index 59%
rename from manager/knot_resolver_manager/datamodel/types.py
rename to manager/knot_resolver_manager/datamodel/types/types.py
index 99fdee4f8c46bafd2d015386a9ed21265155e1c2..d4fe7f304688d1adcf03c7c0a52902f0b46d4bee 100644 (file)
 import ipaddress
-import logging
 import re
 from pathlib import Path
-from typing import Any, Dict, Optional, Pattern, Type, Union
-
-from typing_extensions import Literal
+from typing import Any, Dict, Optional, Type, Union
 
+from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase
 from knot_resolver_manager.exceptions import SchemaException
 from knot_resolver_manager.utils import CustomValueType
 
-logger = logging.getLogger(__name__)
-
-# Policy actions
-ActionEnum = Literal[
-    # Nonchain actions
-    "pass",
-    "deny",
-    "drop",
-    "refuse",
-    "tc",
-    "reroute",
-    "answer",
-    # Chain actions
-    "mirror",
-    "debug-always",
-    "debug-cache-miss",
-    "qtrace",
-    "reqtrace",
-]
-
-# FLAGS from https://knot-resolver.readthedocs.io/en/stable/lib.html?highlight=options#c.kr_qflags
-FlagsEnum = Literal[
-    "no-minimize",
-    "no-ipv4",
-    "no-ipv6",
-    "tcp",
-    "resolved",
-    "await-ipv4",
-    "await-ipv6",
-    "await-cut",
-    "no-edns",
-    "cached",
-    "no-cache",
-    "expiring",
-    "allow_local",
-    "dnssec-want",
-    "dnssec-bogus",
-    "dnssec-insecure",
-    "dnssec-cd",
-    "stub",
-    "always-cut",
-    "dnssec-wexpand",
-    "permissive",
-    "strict",
-    "badcookie-again",
-    "cname",
-    "reorder-rr",
-    "trace",
-    "no-0x20",
-    "dnssec-nods",
-    "dnssec-optout",
-    "nonauth",
-    "forward",
-    "dns64-mark",
-    "cache-tried",
-    "no-ns-found",
-    "pkt-is-sane",
-    "dns64-disable",
-]
-
-# DNS record types from 'kres.type' table
-RecordTypeEnum = Literal[
-    "A",
-    "A6",
-    "AAAA",
-    "AFSDB",
-    "ANY",
-    "APL",
-    "ATMA",
-    "AVC",
-    "AXFR",
-    "CAA",
-    "CDNSKEY",
-    "CDS",
-    "CERT",
-    "CNAME",
-    "CSYNC",
-    "DHCID",
-    "DLV",
-    "DNAME",
-    "DNSKEY",
-    "DOA",
-    "DS",
-    "EID",
-    "EUI48",
-    "EUI64",
-    "GID",
-    "GPOS",
-    "HINFO",
-    "HIP",
-    "HTTPS",
-    "IPSECKEY",
-    "ISDN",
-    "IXFR",
-    "KEY",
-    "KX",
-    "L32",
-    "L64",
-    "LOC",
-    "LP",
-    "MAILA",
-    "MAILB",
-    "MB",
-    "MD",
-    "MF",
-    "MG",
-    "MINFO",
-    "MR",
-    "MX",
-    "NAPTR",
-    "NID",
-    "NIMLOC",
-    "NINFO",
-    "NS",
-    "NSAP",
-    "NSAP-PTR",
-    "NSEC",
-    "NSEC3",
-    "NSEC3PARAM",
-    "NULL",
-    "NXT",
-    "OPENPGPKEY",
-    "OPT",
-    "PTR",
-    "PX",
-    "RKEY",
-    "RP",
-    "RRSIG",
-    "RT",
-    "SIG",
-    "SINK",
-    "SMIMEA",
-    "SOA",
-    "SPF",
-    "SRV",
-    "SSHFP",
-    "SVCB",
-    "TA",
-    "TALINK",
-    "TKEY",
-    "TLSA",
-    "TSIG",
-    "TXT",
-    "UID",
-    "UINFO",
-    "UNSPEC",
-    "URI",
-    "WKS",
-    "X25",
-    "ZONEMD",
-]
-
-
-class _IntCustomBase(CustomValueType):
-    """
-    Base class to work with integer value that is intended as a basis for other custom types.
-    """
-
-    _value: int
-
-    def __int__(self) -> int:
-        return self._value
-
-    def __str__(self) -> str:
-        return str(self._value)
-
-    def __eq__(self, o: object) -> bool:
-        return isinstance(o, _IntCustomBase) and o._value == self._value
-
-    def serialize(self) -> Any:
-        return self._value
-
-    @classmethod
-    def json_schema(cls: Type["_IntCustomBase"]) -> Dict[Any, Any]:
-        return {"type": "integer"}
-
-
-class _StrCustomBase(CustomValueType):
-    """
-    Base class to work with string value that is intended as a basis for other custom types.
-    """
-
-    _value: str
-
-    def __int__(self) -> int:
-        raise ValueError("Can't convert string to an integer.")
-
-    def __str__(self) -> str:
-        return self._value
-
-    def to_std(self) -> str:
-        return self._value
-
-    def __hash__(self) -> int:
-        return hash(self._value)
-
-    def __eq__(self, o: object) -> bool:
-        return isinstance(o, _StrCustomBase) and o._value == self._value
-
-    def serialize(self) -> Any:
-        return self._value
-
-    @classmethod
-    def json_schema(cls: Type["_StrCustomBase"]) -> Dict[Any, Any]:
-        return {"type": "string"}
-
-
-class _IntRangeBase(_IntCustomBase):
-    """
-    Base class to work with integer value ranges that is intended as a basis for other custom types.
-    Just inherit the class and set the values for _min and _max.
-
-    class CustomIntRange(_IntRangeBase):
-        _min: int = 0
-        _max: int = 10_000
-    """
-
-    _min: int
-    _max: int
-
-    def __init__(self, source_value: Any, object_path: str = "/") -> None:
-        super().__init__(source_value)
-        if isinstance(source_value, int):
-            if not self._min <= source_value <= self._max:
-                raise SchemaException(
-                    f"Integer value {source_value} out of range <{self._min}, {self._max}>", object_path
-                )
-            self._value = source_value
-        else:
-            raise SchemaException(
-                f"Unexpected input type for integer - {type(source_value)}."
-                " Cause might be invalid format or invalid type.",
-                object_path,
-            )
-
-    @classmethod
-    def json_schema(cls: Type["_IntRangeBase"]) -> Dict[Any, Any]:
-        return {"type": "integer", "minimum": cls._min, "maximum": cls._max}
-
-
-class _PatternBase(_StrCustomBase):
-    """
-    Base class to work with string value that match regex pattern.
-    Just inherit the class and set pattern for _re.
-
-    class ABPattern(_PatternBase):
-        _re: Pattern[str] = re.compile(r"ab*")
-    """
-
-    _re: Pattern[str]
 
-    def __init__(self, source_value: Any, object_path: str = "/") -> None:
-        super().__init__(source_value)
-        if isinstance(source_value, str):
-            if type(self)._re.match(source_value):
-                self._value: str = source_value
-            else:
-                raise SchemaException(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path)
-        else:
-            raise SchemaException(
-                f"Unexpected input type for string pattern - {type(source_value)}."
-                " Cause might be invalid format or invalid type.",
-                object_path,
-            )
-
-    @classmethod
-    def json_schema(cls: Type["_PatternBase"]) -> Dict[Any, Any]:
-        return {"type": "string", "pattern": rf"{cls._re.pattern}"}
-
-
-class PortNumber(_IntRangeBase):
+class PortNumber(IntRangeBase):
     _min: int = 1
     _max: int = 65_535
 
@@ -291,82 +20,14 @@ class PortNumber(_IntRangeBase):
             raise SchemaException(f"Invalid port number {port}", object_path) from e
 
 
-class Unit(CustomValueType):
-    _re: Pattern[str]
-    _units: Dict[str, int]
-
-    def __init__(self, source_value: Any, object_path: str = "/") -> None:
-        super().__init__(source_value)
-        self._value: int
-        self._value_orig: Union[str, int]
-        if isinstance(source_value, str) and type(self)._re.match(source_value):
-            self._value_orig = source_value
-            grouped = type(self)._re.search(source_value)
-            if grouped:
-                val, unit = grouped.groups()
-                if unit is None:
-                    raise SchemaException(
-                        f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path
-                    )
-                elif unit not in type(self)._units:
-                    raise SchemaException(
-                        f"Used unexpected unit '{unit}' for {type(self).__name__}."
-                        f" Accepted units are {list(type(self)._units.keys())}",
-                        object_path,
-                    )
-                self._value = int(val) * type(self)._units[unit]
-            else:
-                raise SchemaException(f"{type(self._value)} Failed to convert: {self}", object_path)
-        elif isinstance(source_value, int):
-            raise SchemaException(
-                "We do not accept number without units."
-                f" Please convert the value to string an add a unit - {list(type(self)._units.keys())}",
-                object_path,
-            )
-        else:
-            raise SchemaException(
-                f"Unexpected input type for Unit type - {type(source_value)}."
-                " Cause might be invalid format or invalid type.",
-                object_path,
-            )
-
-    def __int__(self) -> int:
-        return self._value
-
-    def __str__(self) -> str:
-        """
-        Used by Jinja2. Must return only a number.
-        """
-        return str(self._value_orig)
-
-    def __repr__(self) -> str:
-        return f"Unit[{type(self).__name__},{self._value_orig}]"
-
-    def __eq__(self, o: object) -> bool:
-        """
-        Two instances are equal when they represent the same size
-        regardless of their string representation.
-        """
-        return isinstance(o, Unit) and o._value == self._value
-
-    def serialize(self) -> Any:
-        return self._value_orig
-
-    @classmethod
-    def json_schema(cls: Type["Unit"]) -> Dict[Any, Any]:
-        return {"type": "string", "pattern": r"\d+(" + "|".join(cls._units.keys()) + ")"}
-
-
-class SizeUnit(Unit):
-    _re = re.compile(r"^([0-9]+)\s{0,1}([BKMG]){0,1}$")
+class SizeUnit(UnitBase):
     _units = {"B": 1, "K": 1024, "M": 1024 ** 2, "G": 1024 ** 3}
 
     def bytes(self) -> int:
         return self._value
 
 
-class TimeUnit(Unit):
-    _re = re.compile(r"^(\d+)\s{0,1}([smhd]s?){0,1}$")
+class TimeUnit(UnitBase):
     _units = {"ms": 1, "s": 1000, "m": 60 * 1000, "h": 3600 * 1000, "d": 24 * 3600 * 1000}
 
     def seconds(self) -> int:
@@ -376,61 +37,7 @@ class TimeUnit(Unit):
         return self._value
 
 
-class UncheckedPath(CustomValueType):
-    """
-    Wrapper around pathlib.Path object. Can represent pretty much any Path. No checks are
-    performed on the value. The value is taken as is.
-    """
-
-    def __init__(self, source_value: Any, object_path: str = "/") -> None:
-        super().__init__(source_value, object_path=object_path)
-        if isinstance(source_value, str):
-            self._value: Path = Path(source_value)
-        else:
-            raise SchemaException(
-                f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'.", object_path
-            )
-
-    def __str__(self) -> str:
-        return str(self._value)
-
-    def __eq__(self, o: object) -> bool:
-        if not isinstance(o, UncheckedPath):
-            return False
-
-        return o._value == self._value
-
-    def __int__(self) -> int:
-        raise RuntimeError("Path cannot be converted to type <int>")
-
-    def to_path(self) -> Path:
-        return self._value
-
-    def serialize(self) -> Any:
-        return str(self._value)
-
-    @classmethod
-    def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]:
-        return {
-            "type": "string",
-        }
-
-
-class CheckedPath(UncheckedPath):
-    """
-    Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle,
-    no symlink loops. This however means, that resolving of relative path happens while validating.
-    """
-
-    def __init__(self, source_value: Any, object_path: str = "/") -> None:
-        super().__init__(source_value, object_path=object_path)
-        try:
-            self._value = self._value.resolve(strict=False)
-        except RuntimeError as e:
-            raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
-
-
-class DomainName(_PatternBase):
+class DomainName(PatternBase):
     _re = re.compile(
         r"^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|"
         r"([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|"
@@ -439,11 +46,11 @@ class DomainName(_PatternBase):
     )
 
 
-class InterfaceName(_PatternBase):
+class InterfaceName(PatternBase):
     _re = re.compile(r"^[a-zA-Z0-9]+(?:[-_][a-zA-Z0-9]+)*$")
 
 
-class InterfacePort(_StrCustomBase):
+class InterfacePort(StrBase):
     if_name: InterfaceName
     port: PortNumber
 
@@ -465,7 +72,7 @@ class InterfacePort(_StrCustomBase):
             )
 
 
-class InterfaceOptionalPort(_StrCustomBase):
+class InterfaceOptionalPort(StrBase):
     if_name: InterfaceName
     port: Optional[PortNumber] = None
 
@@ -488,7 +95,7 @@ class InterfaceOptionalPort(_StrCustomBase):
             )
 
 
-class IPAddressPort(_StrCustomBase):
+class IPAddressPort(StrBase):
     addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
     port: PortNumber
 
@@ -513,7 +120,7 @@ class IPAddressPort(_StrCustomBase):
             )
 
 
-class IPAddressOptionalPort(_StrCustomBase):
+class IPAddressOptionalPort(StrBase):
     addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
     port: Optional[PortNumber] = None
 
@@ -712,3 +319,59 @@ class IPv6Network96(CustomValueType):
     @classmethod
     def json_schema(cls: Type["IPv6Network96"]) -> Dict[Any, Any]:
         return {"type": "string"}
+
+
+class UncheckedPath(CustomValueType):
+    """
+    Wrapper around pathlib.Path object. Can represent pretty much any Path. No checks are
+    performed on the value. The value is taken as is.
+    """
+
+    _value: Path
+
+    def __init__(self, source_value: Any, object_path: str = "/") -> None:
+        super().__init__(source_value, object_path=object_path)
+        if isinstance(source_value, str):
+            self._value: Path = Path(source_value)
+        else:
+            raise SchemaException(
+                f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'.", object_path
+            )
+
+    def __str__(self) -> str:
+        return str(self._value)
+
+    def __eq__(self, o: object) -> bool:
+        if not isinstance(o, UncheckedPath):
+            return False
+
+        return o._value == self._value
+
+    def __int__(self) -> int:
+        raise RuntimeError("Path cannot be converted to type <int>")
+
+    def to_path(self) -> Path:
+        return self._value
+
+    def serialize(self) -> Any:
+        return str(self._value)
+
+    @classmethod
+    def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]:
+        return {
+            "type": "string",
+        }
+
+
+class CheckedPath(UncheckedPath):
+    """
+    Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle,
+    no symlink loops. This however means, that resolving of relative path happens while validating.
+    """
+
+    def __init__(self, source_value: Any, object_path: str = "/") -> None:
+        super().__init__(source_value, object_path=object_path)
+        try:
+            self._value = self._value.resolve(strict=False)
+        except RuntimeError as e:
+            raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
index 150fe6ea64cbd470bdf95d1c0e1fcc3f566bc01a..65e757f092832f7c89c55988593507ce9c35242c 100644 (file)
@@ -1,6 +1,6 @@
 from typing import List, Optional
 
-from knot_resolver_manager.datamodel.types import FlagsEnum, IPNetwork
+from knot_resolver_manager.datamodel.types import IPNetwork, PolicyFlagEnum
 from knot_resolver_manager.utils import SchemaNode
 
 
@@ -16,7 +16,7 @@ class ViewSchema(SchemaNode):
 
     subnets: Optional[List[IPNetwork]] = None
     tsig: Optional[List[str]] = None
-    options: Optional[List[FlagsEnum]] = None
+    options: Optional[List[PolicyFlagEnum]] = None
 
     def _validate(self) -> None:
         if self.tsig is None and self.subnets is None:
index 6f9c864bbac002057dfb917993e8f9f96204047e..8f3e04f1aa7037912e41a367ceccd22fca4a3e50 100644 (file)
@@ -6,6 +6,7 @@ packages = \
  'knot_resolver_manager.client',
  'knot_resolver_manager.compat',
  'knot_resolver_manager.datamodel',
+ 'knot_resolver_manager.datamodel.types',
  'knot_resolver_manager.kresd_controller',
  'knot_resolver_manager.kresd_controller.supervisord',
  'knot_resolver_manager.kresd_controller.systemd',
index cc8b941ddd6f0325205f301ab1dcff01291d7ea7..129055a8b23a7bc58cd4c86b08b3125d48f29d09 100644 (file)
@@ -3,7 +3,7 @@ from typing import List
 from knot_resolver_manager.datamodel.config_schema import template_from_str
 from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
 from knot_resolver_manager.datamodel.policy_schema import AnswerSchema
-from knot_resolver_manager.datamodel.types import FlagsEnum
+from knot_resolver_manager.datamodel.types import PolicyFlagEnum
 
 
 def test_policy_add():
@@ -17,7 +17,7 @@ def test_policy_add():
 
 
 def test_policy_flags():
-    flags: List[FlagsEnum] = ["no-cache", "no-edns"]
+    flags: List[PolicyFlagEnum] = ["no-cache", "no-edns"]
     tmpl_str = """{% from 'macros/policy_macros.lua.j2' import policy_flags %}
 {{ policy_flags(flags) }}"""