From: Aleš Date: Thu, 9 Dec 2021 01:16:30 +0000 (+0100) Subject: datamodel: policy: added policy actions X-Git-Tag: v6.0.0a1~69^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3771e7c1e807dfd335b5dc411e6399d932d800c7;p=thirdparty%2Fknot-resolver.git datamodel: policy: added policy actions - tests for policy actions - QTypeEnum: added some query-type literals --- diff --git a/manager/knot_resolver_manager/datamodel/policy_schema.py b/manager/knot_resolver_manager/datamodel/policy_schema.py index 07851188c..87f74e2b6 100644 --- a/manager/knot_resolver_manager/datamodel/policy_schema.py +++ b/manager/knot_resolver_manager/datamodel/policy_schema.py @@ -1,22 +1,20 @@ from typing import List, Optional -from knot_resolver_manager.datamodel.types import IPAddressPort, TimeUnit +from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema +from knot_resolver_manager.datamodel.types import ActionEnum, DomainName, IPAddressPort, QTypeEnum, TimeUnit from knot_resolver_manager.datamodel.view_schema import FlagsEnum from knot_resolver_manager.utils import SchemaNode -from knot_resolver_manager.utils.types import LiteralEnum - -# TODO: add all other options -ActionEnum = LiteralEnum["pass", "deny", "mirror", "forward", "modify"] class FilterSchema(SchemaNode): - suffix: Optional[str] = None - pattern: Optional[str] = None - query_type: Optional[str] = None + domain: Optional[List[DomainName]] = None + suffix: Optional[List[str]] = None + pattern: Optional[List[str]] = None + qtype: Optional[List[QTypeEnum]] = None class AnswerSchema(SchemaNode): - query_type: str + qtype: QTypeEnum rdata: str ttl: TimeUnit = TimeUnit("1s") no_data: bool = False @@ -24,13 +22,30 @@ class AnswerSchema(SchemaNode): class PolicySchema(SchemaNode): action: ActionEnum - filters: Optional[List[FilterSchema]] = None + order: Optional[int] = None + filter: Optional[FilterSchema] = None views: Optional[List[str]] = None options: Optional[List[FlagsEnum]] = None message: Optional[str] = None - mirror: Optional[List[IPAddressPort]] = None - forward: Optional[List[IPAddressPort]] = None + reroute: Optional[List[AddressRenumberingSchema]] = None answer: Optional[AnswerSchema] = None + mirror: Optional[List[IPAddressPort]] = None def _validate(self) -> None: - pass + # checking for missing fields + if self.action == "reroute" and not self.reroute: + raise ValueError("missing mandatory field 'reroute' for 'reroute' action") + if self.action == "answer" and not self.answer: + raise ValueError("missing mandatory field 'answer' for 'answer' action") + if self.action == "mirror" and not self.mirror: + raise ValueError("missing mandatory field 'mirror' for 'mirror' action") + + # checking for unnecessary fields + if self.message and not self.action == "deny": + raise ValueError("'message' field can only be defined for 'deny' action") + if self.reroute and not self.action == "reroute": + raise ValueError("'answer' field can only be defined for 'answer' action") + if self.answer and not self.action == "answer": + raise ValueError("'answer' field can only be defined for 'answer' action") + if self.mirror and not self.action == "mirror": + raise ValueError("'mirror' field can only be defined for 'mirror' action") diff --git a/manager/knot_resolver_manager/datamodel/server_schema.py b/manager/knot_resolver_manager/datamodel/server_schema.py index 0762977cb..668f75404 100644 --- a/manager/knot_resolver_manager/datamodel/server_schema.py +++ b/manager/knot_resolver_manager/datamodel/server_schema.py @@ -5,7 +5,7 @@ from typing import Any, Optional, Union from typing_extensions import Literal -from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, UncheckedPath +from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, QTypeEnum, UncheckedPath from knot_resolver_manager.exceptions import DataException from knot_resolver_manager.utils import SchemaNode from knot_resolver_manager.utils.types import LiteralEnum @@ -36,7 +36,7 @@ BackendEnum = LiteralEnum["auto", "systemd", "supervisord"] class WatchDogSchema(SchemaNode): qname: DomainName - qtype: str + qtype: QTypeEnum class ManagementSchema(SchemaNode): diff --git a/manager/knot_resolver_manager/datamodel/types.py b/manager/knot_resolver_manager/datamodel/types.py index bf892cfba..6c74416b7 100644 --- a/manager/knot_resolver_manager/datamodel/types.py +++ b/manager/knot_resolver_manager/datamodel/types.py @@ -8,9 +8,30 @@ from typing import Any, Dict, Optional, Pattern, Type, Union from knot_resolver_manager.exceptions import SchemaException from knot_resolver_manager.utils import CustomValueType, SchemaNode from knot_resolver_manager.utils.modelling import Serializable +from knot_resolver_manager.utils.types import LiteralEnum logger = logging.getLogger(__name__) +ActionEnum = LiteralEnum[ + # Nonchain actions + "pass", + "deny", + "drop", + "refuse", + "tc", + "reroute", + "answer", + # Chain actions + "mirror", + "debug-always", + "debug-cache-miss", + "qtrace", + "reqtrace", +] + +# TODO: add other query types +QTypeEnum = LiteralEnum["A", "AAAA", "CNAME", "MX", "NS", "ptr", "CERT", "SRV", "TXT", "SOA"] + class Unit(CustomValueType): _re: Pattern[str] diff --git a/manager/tests/unit/datamodel/test_policy_schema.py b/manager/tests/unit/datamodel/test_policy_schema.py new file mode 100644 index 000000000..8755fd035 --- /dev/null +++ b/manager/tests/unit/datamodel/test_policy_schema.py @@ -0,0 +1,53 @@ +from pytest import raises + +from knot_resolver_manager.datamodel.policy_schema import PolicySchema +from knot_resolver_manager.exceptions import KresdManagerException + + +def test_simple_actions(): + assert PolicySchema({"action": "pass"}) + assert PolicySchema({"action": "deny"}) + assert PolicySchema({"action": "drop"}) + assert PolicySchema({"action": "refuse"}) + assert PolicySchema({"action": "tc"}) + assert PolicySchema({"action": "debug-always"}) + assert PolicySchema({"action": "debug-cache-miss"}) + assert PolicySchema({"action": "qtrace"}) + assert PolicySchema({"action": "reqtrace"}) + + with raises(KresdManagerException): + PolicySchema({"action": "invalid-action"}) + + +def test_deny_message(): + assert PolicySchema({"action": "deny", "message": "this is deny message"}) + + with raises(KresdManagerException): + PolicySchema({"action": "pass", "message": "this is deny message"}) + + +def test_reroute(): + assert PolicySchema({"action": "reroute", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]}) + + with raises(KresdManagerException): + PolicySchema({"action": "reroute"}) + with raises(KresdManagerException): + PolicySchema({"action": "pass", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]}) + + +def test_answer(): + assert PolicySchema({"action": "answer", "answer": {"qtype": "AAAA", "rdata": "::1"}}) + + with raises(KresdManagerException): + PolicySchema({"action": "answer"}) + with raises(KresdManagerException): + PolicySchema({"action": "pass", "answer": {"qtype": "AAAA", "rdata": "::1"}}) + + +def test_mirror(): + assert PolicySchema({"action": "mirror", "mirror": ["127.0.0.1@5353"]}) + + with raises(KresdManagerException): + PolicySchema({"action": "mirror"}) + with raises(KresdManagerException): + PolicySchema({"action": "pass", "mirror": ["127.0.0.1@5353"]})