from typing import List, Optional
-from knot_resolver_manager.datamodel.types import IPAddressPort, TimeUnit
+from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
+from knot_resolver_manager.datamodel.types import ActionEnum, DomainName, IPAddressPort, QTypeEnum, TimeUnit
from knot_resolver_manager.datamodel.view_schema import FlagsEnum
from knot_resolver_manager.utils import SchemaNode
-from knot_resolver_manager.utils.types import LiteralEnum
-
-# TODO: add all other options
-ActionEnum = LiteralEnum["pass", "deny", "mirror", "forward", "modify"]
class FilterSchema(SchemaNode):
- suffix: Optional[str] = None
- pattern: Optional[str] = None
- query_type: Optional[str] = None
+ domain: Optional[List[DomainName]] = None
+ suffix: Optional[List[str]] = None
+ pattern: Optional[List[str]] = None
+ qtype: Optional[List[QTypeEnum]] = None
class AnswerSchema(SchemaNode):
- query_type: str
+ qtype: QTypeEnum
rdata: str
ttl: TimeUnit = TimeUnit("1s")
no_data: bool = False
class PolicySchema(SchemaNode):
action: ActionEnum
- filters: Optional[List[FilterSchema]] = None
+ order: Optional[int] = None
+ filter: Optional[FilterSchema] = None
views: Optional[List[str]] = None
options: Optional[List[FlagsEnum]] = None
message: Optional[str] = None
- mirror: Optional[List[IPAddressPort]] = None
- forward: Optional[List[IPAddressPort]] = None
+ reroute: Optional[List[AddressRenumberingSchema]] = None
answer: Optional[AnswerSchema] = None
+ mirror: Optional[List[IPAddressPort]] = None
def _validate(self) -> None:
- pass
+ # checking for missing fields
+ if self.action == "reroute" and not self.reroute:
+ raise ValueError("missing mandatory field 'reroute' for 'reroute' action")
+ if self.action == "answer" and not self.answer:
+ raise ValueError("missing mandatory field 'answer' for 'answer' action")
+ if self.action == "mirror" and not self.mirror:
+ raise ValueError("missing mandatory field 'mirror' for 'mirror' action")
+
+ # checking for unnecessary fields
+ if self.message and not self.action == "deny":
+ raise ValueError("'message' field can only be defined for 'deny' action")
+ if self.reroute and not self.action == "reroute":
+ raise ValueError("'answer' field can only be defined for 'answer' action")
+ if self.answer and not self.action == "answer":
+ raise ValueError("'answer' field can only be defined for 'answer' action")
+ if self.mirror and not self.action == "mirror":
+ raise ValueError("'mirror' field can only be defined for 'mirror' action")
from typing_extensions import Literal
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, UncheckedPath
+from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, QTypeEnum, UncheckedPath
from knot_resolver_manager.exceptions import DataException
from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
class WatchDogSchema(SchemaNode):
qname: DomainName
- qtype: str
+ qtype: QTypeEnum
class ManagementSchema(SchemaNode):
from knot_resolver_manager.exceptions import SchemaException
from knot_resolver_manager.utils import CustomValueType, SchemaNode
from knot_resolver_manager.utils.modelling import Serializable
+from knot_resolver_manager.utils.types import LiteralEnum
logger = logging.getLogger(__name__)
+ActionEnum = LiteralEnum[
+ # Nonchain actions
+ "pass",
+ "deny",
+ "drop",
+ "refuse",
+ "tc",
+ "reroute",
+ "answer",
+ # Chain actions
+ "mirror",
+ "debug-always",
+ "debug-cache-miss",
+ "qtrace",
+ "reqtrace",
+]
+
+# TODO: add other query types
+QTypeEnum = LiteralEnum["A", "AAAA", "CNAME", "MX", "NS", "ptr", "CERT", "SRV", "TXT", "SOA"]
+
class Unit(CustomValueType):
_re: Pattern[str]
--- /dev/null
+from pytest import raises
+
+from knot_resolver_manager.datamodel.policy_schema import PolicySchema
+from knot_resolver_manager.exceptions import KresdManagerException
+
+
+def test_simple_actions():
+ assert PolicySchema({"action": "pass"})
+ assert PolicySchema({"action": "deny"})
+ assert PolicySchema({"action": "drop"})
+ assert PolicySchema({"action": "refuse"})
+ assert PolicySchema({"action": "tc"})
+ assert PolicySchema({"action": "debug-always"})
+ assert PolicySchema({"action": "debug-cache-miss"})
+ assert PolicySchema({"action": "qtrace"})
+ assert PolicySchema({"action": "reqtrace"})
+
+ with raises(KresdManagerException):
+ PolicySchema({"action": "invalid-action"})
+
+
+def test_deny_message():
+ assert PolicySchema({"action": "deny", "message": "this is deny message"})
+
+ with raises(KresdManagerException):
+ PolicySchema({"action": "pass", "message": "this is deny message"})
+
+
+def test_reroute():
+ assert PolicySchema({"action": "reroute", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]})
+
+ with raises(KresdManagerException):
+ PolicySchema({"action": "reroute"})
+ with raises(KresdManagerException):
+ PolicySchema({"action": "pass", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]})
+
+
+def test_answer():
+ assert PolicySchema({"action": "answer", "answer": {"qtype": "AAAA", "rdata": "::1"}})
+
+ with raises(KresdManagerException):
+ PolicySchema({"action": "answer"})
+ with raises(KresdManagerException):
+ PolicySchema({"action": "pass", "answer": {"qtype": "AAAA", "rdata": "::1"}})
+
+
+def test_mirror():
+ assert PolicySchema({"action": "mirror", "mirror": ["127.0.0.1@5353"]})
+
+ with raises(KresdManagerException):
+ PolicySchema({"action": "mirror"})
+ with raises(KresdManagerException):
+ PolicySchema({"action": "pass", "mirror": ["127.0.0.1@5353"]})