ca_file: Optional[CheckedPath] = None
+def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None:
+ servers = ["mirror", "forward", "stub"]
+
+ def _field(ac: str) -> str:
+ if ac in servers:
+ return "servers"
+ return {"deny": "message"}.get(ac, ac)
+
+ configurable_actions = ["deny", "reroute", "answer"] + servers
+
+ # checking for missing mandatory fields for actions
+ field = _field(policy_action.action)
+ if policy_action.action in configurable_actions and not getattr(policy_action, field):
+ raise ValueError(f"missing mandatory field '{field}' for '{policy_action.action}' action")
+
+ # checking for unnecessary fields
+ for ac in configurable_actions + ["deny"]:
+ field = _field(ac)
+ if getattr(policy_action, field) and _field(policy_action.action) != field:
+ raise ValueError(f"'{field}' field can only be defined for '{ac}' action")
+
+ # ForwardServerSchema is valid only for 'forward' action
+ if policy_action.servers:
+ for server in policy_action.servers: # pylint: disable=not-an-iterable
+ if policy_action.action != "forward" and isinstance(server, ForwardServerSchema):
+ raise ValueError(
+ f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{policy_action.action}'"
+ )
+
+
+class ActionSchema(SchemaNode):
+ """
+ Configuration of policy action.
+
+ ---
+ action: Policy action.
+ message: Deny message for 'deny' action.
+ reroute: Configuration for 'reroute' action.
+ answer: Answer definition for 'answer' action.
+ servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
+ """
+
+ action: PolicyActionEnum
+ message: Optional[str] = None
+ reroute: Optional[List[AddressRenumberingSchema]] = None
+ answer: Optional[AnswerSchema] = None
+ servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
+
+ def _validate(self) -> None:
+ _validate_policy_action(self)
+
+
class PolicySchema(SchemaNode):
"""
Configuration of policy rule.
servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
def _validate(self) -> None:
- servers = ["mirror", "forward", "stub"]
-
- def _field(action: str) -> str:
- if action in servers:
- return "servers"
- return {"deny": "message"}.get(action, action)
-
- configurable_actions = ["deny", "reroute", "answer"] + servers
-
- # checking for missing mandatory fields for actions
- field = _field(self.action)
- if self.action in configurable_actions and not getattr(self, field):
- raise ValueError(f"missing mandatory field '{field}' for '{self.action}' action")
-
- # checking for unnecessary fields
- for action in configurable_actions + ["deny"]:
- field = _field(action)
- if getattr(self, field) and _field(self.action) != field:
- raise ValueError(f"'{field}' field can only be defined for '{action}' action")
-
- # ForwardServerSchema is valid only for 'forward' action
- if self.servers:
- for server in self.servers: # pylint: disable=not-an-iterable
- if self.action != "forward" and isinstance(server, ForwardServerSchema):
- raise ValueError(
- f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{self.action}'"
- )
+ _validate_policy_action(self)
import pytest
from pytest import raises
-from knot_resolver_manager.datamodel.policy_schema import PolicySchema
+from knot_resolver_manager.datamodel.policy_schema import ActionSchema, PolicySchema
from knot_resolver_manager.datamodel.types import PolicyActionEnum
from knot_resolver_manager.exceptions import KresManagerException
from knot_resolver_manager.utils.types import get_generic_type_arguments
@pytest.mark.parametrize("val", [item for item in policy_actions if item not in configurable_actions])
def test_policy_action_valid(val: Any):
PolicySchema({"action": val})
+ ActionSchema({"action": val})
@pytest.mark.parametrize("val", [{"action": "invalid-action"}])
def test_action_invalid(val: Dict[str, Any]):
with raises(KresManagerException):
PolicySchema(val)
+ with raises(KresManagerException):
+ ActionSchema(val)
@pytest.mark.parametrize(
)
def test_policy_valid(val: Dict[str, Any]):
PolicySchema(val)
+ ActionSchema(val)
@pytest.mark.parametrize(
def test_policy_invalid(val: Dict[str, Any]):
with raises(KresManagerException):
PolicySchema(val)
+ with raises(KresManagerException):
+ ActionSchema(val)
@pytest.mark.parametrize(
def test_policy_message_invalid(val: str):
with raises(KresManagerException):
PolicySchema({"action": f"{val}", "message": "this is deny message"})
+ with raises(KresManagerException):
+ ActionSchema({"action": f"{val}", "message": "this is deny message"})