]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: datamodel: slices: actions validation
authorAleš Mrázek <ales.mrazek@nic.cz>
Fri, 17 Jun 2022 22:12:31 +0000 (00:12 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 17 Jun 2022 22:12:31 +0000 (00:12 +0200)
manager/knot_resolver_manager/datamodel/policy_schema.py
manager/knot_resolver_manager/datamodel/slice_schema.py
manager/tests/unit/datamodel/test_policy_schema.py

index c521d5cb433e0cf46d80edfce4a1947738851c6d..d435d32143cb0b4d889b07e93307909b16a167e4 100644 (file)
@@ -62,6 +62,58 @@ class ForwardServerSchema(SchemaNode):
     ca_file: Optional[CheckedPath] = None
 
 
+def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None:
+    servers = ["mirror", "forward", "stub"]
+
+    def _field(ac: str) -> str:
+        if ac in servers:
+            return "servers"
+        return {"deny": "message"}.get(ac, ac)
+
+    configurable_actions = ["deny", "reroute", "answer"] + servers
+
+    # checking for missing mandatory fields for actions
+    field = _field(policy_action.action)
+    if policy_action.action in configurable_actions and not getattr(policy_action, field):
+        raise ValueError(f"missing mandatory field '{field}' for '{policy_action.action}' action")
+
+    # checking for unnecessary fields
+    for ac in configurable_actions + ["deny"]:
+        field = _field(ac)
+        if getattr(policy_action, field) and _field(policy_action.action) != field:
+            raise ValueError(f"'{field}' field can only be defined for '{ac}' action")
+
+    # ForwardServerSchema is valid only for 'forward' action
+    if policy_action.servers:
+        for server in policy_action.servers:  # pylint: disable=not-an-iterable
+            if policy_action.action != "forward" and isinstance(server, ForwardServerSchema):
+                raise ValueError(
+                    f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{policy_action.action}'"
+                )
+
+
+class ActionSchema(SchemaNode):
+    """
+    Configuration of policy action.
+
+    ---
+    action: Policy action.
+    message: Deny message for 'deny' action.
+    reroute: Configuration for 'reroute' action.
+    answer: Answer definition for 'answer' action.
+    servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
+    """
+
+    action: PolicyActionEnum
+    message: Optional[str] = None
+    reroute: Optional[List[AddressRenumberingSchema]] = None
+    answer: Optional[AnswerSchema] = None
+    servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
+
+    def _validate(self) -> None:
+        _validate_policy_action(self)
+
+
 class PolicySchema(SchemaNode):
     """
     Configuration of policy rule.
@@ -89,30 +141,4 @@ class PolicySchema(SchemaNode):
     servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
 
     def _validate(self) -> None:
-        servers = ["mirror", "forward", "stub"]
-
-        def _field(action: str) -> str:
-            if action in servers:
-                return "servers"
-            return {"deny": "message"}.get(action, action)
-
-        configurable_actions = ["deny", "reroute", "answer"] + servers
-
-        # checking for missing mandatory fields for actions
-        field = _field(self.action)
-        if self.action in configurable_actions and not getattr(self, field):
-            raise ValueError(f"missing mandatory field '{field}' for '{self.action}' action")
-
-        # checking for unnecessary fields
-        for action in configurable_actions + ["deny"]:
-            field = _field(action)
-            if getattr(self, field) and _field(self.action) != field:
-                raise ValueError(f"'{field}' field can only be defined for '{action}' action")
-
-        # ForwardServerSchema is valid only for 'forward' action
-        if self.servers:
-            for server in self.servers:  # pylint: disable=not-an-iterable
-                if self.action != "forward" and isinstance(server, ForwardServerSchema):
-                    raise ValueError(
-                        f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{self.action}'"
-                    )
+        _validate_policy_action(self)
index a783a82fb4b924d0ad29fd983d99c29214a8fa64..68f3bd2eb49374dcc9c0cf0612d1a45e2cb2f0a9 100644 (file)
@@ -2,10 +2,9 @@ from typing import List, Optional
 
 from typing_extensions import Literal
 
+from knot_resolver_manager.datamodel.policy_schema import ActionSchema
 from knot_resolver_manager.utils import SchemaNode
 
-SlicingFunctionEnum = Literal["randomize-psl"]
-
 
 class SliceSchema(SchemaNode):
     """
@@ -14,7 +13,9 @@ class SliceSchema(SchemaNode):
     ---
     function: Slicing function that returns index based on query
     views: Use this Slice only for clients defined by views.
+    actions: Actions for slice.
     """
 
-    function: SlicingFunctionEnum = "randomize-psl"
+    function: Literal["randomize-psl"] = "randomize-psl"
     views: Optional[List[str]] = None
+    actions: List[ActionSchema]
index 7a64ef890cb858a46f2be4df23bc4e689deac684..f62000d93d346be7d729c42ae5fb1547f184fe93 100644 (file)
@@ -3,7 +3,7 @@ from typing import Any, Dict
 import pytest
 from pytest import raises
 
-from knot_resolver_manager.datamodel.policy_schema import PolicySchema
+from knot_resolver_manager.datamodel.policy_schema import ActionSchema, PolicySchema
 from knot_resolver_manager.datamodel.types import PolicyActionEnum
 from knot_resolver_manager.exceptions import KresManagerException
 from knot_resolver_manager.utils.types import get_generic_type_arguments
@@ -25,12 +25,15 @@ policy_actions = get_generic_type_arguments(PolicyActionEnum)
 @pytest.mark.parametrize("val", [item for item in policy_actions if item not in configurable_actions])
 def test_policy_action_valid(val: Any):
     PolicySchema({"action": val})
+    ActionSchema({"action": val})
 
 
 @pytest.mark.parametrize("val", [{"action": "invalid-action"}])
 def test_action_invalid(val: Dict[str, Any]):
     with raises(KresManagerException):
         PolicySchema(val)
+    with raises(KresManagerException):
+        ActionSchema(val)
 
 
 @pytest.mark.parametrize(
@@ -53,6 +56,7 @@ def test_action_invalid(val: Dict[str, Any]):
 )
 def test_policy_valid(val: Dict[str, Any]):
     PolicySchema(val)
+    ActionSchema(val)
 
 
 @pytest.mark.parametrize(
@@ -70,6 +74,8 @@ def test_policy_valid(val: Dict[str, Any]):
 def test_policy_invalid(val: Dict[str, Any]):
     with raises(KresManagerException):
         PolicySchema(val)
+    with raises(KresManagerException):
+        ActionSchema(val)
 
 
 @pytest.mark.parametrize(
@@ -79,3 +85,5 @@ def test_policy_invalid(val: Dict[str, Any]):
 def test_policy_message_invalid(val: str):
     with raises(KresManagerException):
         PolicySchema({"action": f"{val}", "message": "this is deny message"})
+    with raises(KresManagerException):
+        ActionSchema({"action": f"{val}", "message": "this is deny message"})