]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
datamodel: policy: added policy actions
authorAleš <ales.mrazek@nic.cz>
Thu, 9 Dec 2021 01:16:30 +0000 (02:16 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:53 +0000 (16:17 +0200)
- tests for policy actions
- QTypeEnum: added some query-type literals

manager/knot_resolver_manager/datamodel/policy_schema.py
manager/knot_resolver_manager/datamodel/server_schema.py
manager/knot_resolver_manager/datamodel/types.py
manager/tests/unit/datamodel/test_policy_schema.py [new file with mode: 0644]

index 07851188cf7b4c3fac5ce88a533e37a048a3597b..87f74e2b6fc45891e137212091593bda864c8c44 100644 (file)
@@ -1,22 +1,20 @@
 from typing import List, Optional
 
-from knot_resolver_manager.datamodel.types import IPAddressPort, TimeUnit
+from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
+from knot_resolver_manager.datamodel.types import ActionEnum, DomainName, IPAddressPort, QTypeEnum, TimeUnit
 from knot_resolver_manager.datamodel.view_schema import FlagsEnum
 from knot_resolver_manager.utils import SchemaNode
-from knot_resolver_manager.utils.types import LiteralEnum
-
-# TODO: add all other options
-ActionEnum = LiteralEnum["pass", "deny", "mirror", "forward", "modify"]
 
 
 class FilterSchema(SchemaNode):
-    suffix: Optional[str] = None
-    pattern: Optional[str] = None
-    query_type: Optional[str] = None
+    domain: Optional[List[DomainName]] = None
+    suffix: Optional[List[str]] = None
+    pattern: Optional[List[str]] = None
+    qtype: Optional[List[QTypeEnum]] = None
 
 
 class AnswerSchema(SchemaNode):
-    query_type: str
+    qtype: QTypeEnum
     rdata: str
     ttl: TimeUnit = TimeUnit("1s")
     no_data: bool = False
@@ -24,13 +22,30 @@ class AnswerSchema(SchemaNode):
 
 class PolicySchema(SchemaNode):
     action: ActionEnum
-    filters: Optional[List[FilterSchema]] = None
+    order: Optional[int] = None
+    filter: Optional[FilterSchema] = None
     views: Optional[List[str]] = None
     options: Optional[List[FlagsEnum]] = None
     message: Optional[str] = None
-    mirror: Optional[List[IPAddressPort]] = None
-    forward: Optional[List[IPAddressPort]] = None
+    reroute: Optional[List[AddressRenumberingSchema]] = None
     answer: Optional[AnswerSchema] = None
+    mirror: Optional[List[IPAddressPort]] = None
 
     def _validate(self) -> None:
-        pass
+        # checking for missing fields
+        if self.action == "reroute" and not self.reroute:
+            raise ValueError("missing mandatory field 'reroute' for 'reroute' action")
+        if self.action == "answer" and not self.answer:
+            raise ValueError("missing mandatory field 'answer' for 'answer' action")
+        if self.action == "mirror" and not self.mirror:
+            raise ValueError("missing mandatory field 'mirror' for 'mirror' action")
+
+        # checking for unnecessary fields
+        if self.message and not self.action == "deny":
+            raise ValueError("'message' field can only be defined for 'deny' action")
+        if self.reroute and not self.action == "reroute":
+            raise ValueError("'answer' field can only be defined for 'answer' action")
+        if self.answer and not self.action == "answer":
+            raise ValueError("'answer' field can only be defined for 'answer' action")
+        if self.mirror and not self.action == "mirror":
+            raise ValueError("'mirror' field can only be defined for 'mirror' action")
index 0762977cb50a68b5d5f6660860cba6b9d7fca390..668f75404f077d5994aae75b7c99ac57738c5dde 100644 (file)
@@ -5,7 +5,7 @@ from typing import Any, Optional, Union
 
 from typing_extensions import Literal
 
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, UncheckedPath
+from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, QTypeEnum, UncheckedPath
 from knot_resolver_manager.exceptions import DataException
 from knot_resolver_manager.utils import SchemaNode
 from knot_resolver_manager.utils.types import LiteralEnum
@@ -36,7 +36,7 @@ BackendEnum = LiteralEnum["auto", "systemd", "supervisord"]
 
 class WatchDogSchema(SchemaNode):
     qname: DomainName
-    qtype: str
+    qtype: QTypeEnum
 
 
 class ManagementSchema(SchemaNode):
index bf892cfba08cc916ccdb804a35c31c530baf4845..6c74416b77d42dbbadce6f29424a2e346751d5ed 100644 (file)
@@ -8,9 +8,30 @@ from typing import Any, Dict, Optional, Pattern, Type, Union
 from knot_resolver_manager.exceptions import SchemaException
 from knot_resolver_manager.utils import CustomValueType, SchemaNode
 from knot_resolver_manager.utils.modelling import Serializable
+from knot_resolver_manager.utils.types import LiteralEnum
 
 logger = logging.getLogger(__name__)
 
+ActionEnum = LiteralEnum[
+    # Nonchain actions
+    "pass",
+    "deny",
+    "drop",
+    "refuse",
+    "tc",
+    "reroute",
+    "answer",
+    # Chain actions
+    "mirror",
+    "debug-always",
+    "debug-cache-miss",
+    "qtrace",
+    "reqtrace",
+]
+
+# TODO: add other query types
+QTypeEnum = LiteralEnum["A", "AAAA", "CNAME", "MX", "NS", "ptr", "CERT", "SRV", "TXT", "SOA"]
+
 
 class Unit(CustomValueType):
     _re: Pattern[str]
diff --git a/manager/tests/unit/datamodel/test_policy_schema.py b/manager/tests/unit/datamodel/test_policy_schema.py
new file mode 100644 (file)
index 0000000..8755fd0
--- /dev/null
@@ -0,0 +1,53 @@
+from pytest import raises
+
+from knot_resolver_manager.datamodel.policy_schema import PolicySchema
+from knot_resolver_manager.exceptions import KresdManagerException
+
+
+def test_simple_actions():
+    assert PolicySchema({"action": "pass"})
+    assert PolicySchema({"action": "deny"})
+    assert PolicySchema({"action": "drop"})
+    assert PolicySchema({"action": "refuse"})
+    assert PolicySchema({"action": "tc"})
+    assert PolicySchema({"action": "debug-always"})
+    assert PolicySchema({"action": "debug-cache-miss"})
+    assert PolicySchema({"action": "qtrace"})
+    assert PolicySchema({"action": "reqtrace"})
+
+    with raises(KresdManagerException):
+        PolicySchema({"action": "invalid-action"})
+
+
+def test_deny_message():
+    assert PolicySchema({"action": "deny", "message": "this is deny message"})
+
+    with raises(KresdManagerException):
+        PolicySchema({"action": "pass", "message": "this is deny message"})
+
+
+def test_reroute():
+    assert PolicySchema({"action": "reroute", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]})
+
+    with raises(KresdManagerException):
+        PolicySchema({"action": "reroute"})
+    with raises(KresdManagerException):
+        PolicySchema({"action": "pass", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]})
+
+
+def test_answer():
+    assert PolicySchema({"action": "answer", "answer": {"qtype": "AAAA", "rdata": "::1"}})
+
+    with raises(KresdManagerException):
+        PolicySchema({"action": "answer"})
+    with raises(KresdManagerException):
+        PolicySchema({"action": "pass", "answer": {"qtype": "AAAA", "rdata": "::1"}})
+
+
+def test_mirror():
+    assert PolicySchema({"action": "mirror", "mirror": ["127.0.0.1@5353"]})
+
+    with raises(KresdManagerException):
+        PolicySchema({"action": "mirror"})
+    with raises(KresdManagerException):
+        PolicySchema({"action": "pass", "mirror": ["127.0.0.1@5353"]})