From 84e10664e84898dd10a346fa4b2c1e0175a36eff Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20Mr=C3=A1zek?= Date: Fri, 23 Feb 2024 19:22:22 +0100 Subject: [PATCH] datamodel: forward: custom port and TLS are not supported for authoritative servers --- .../datamodel/forward_schema.py | 23 +++++++- .../unit/datamodel/test_forward_schema.py | 57 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 manager/tests/unit/datamodel/test_forward_schema.py diff --git a/manager/knot_resolver_manager/datamodel/forward_schema.py b/manager/knot_resolver_manager/datamodel/forward_schema.py index 8b4254265..ee5206c27 100644 --- a/manager/knot_resolver_manager/datamodel/forward_schema.py +++ b/manager/knot_resolver_manager/datamodel/forward_schema.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Union +from typing import Any, List, Optional, Union from typing_extensions import Literal @@ -55,3 +55,24 @@ class ForwardSchema(ConfigSchema): subtree: ListOrItem[DomainName] servers: Union[List[IPAddressOptionalPort], List[ForwardServerSchema]] options: ForwardOptionsSchema = ForwardOptionsSchema() + + def _validate(self) -> None: + def is_port_custom(servers: List[Any]) -> bool: + for server in servers: + if isinstance(server, IPAddressOptionalPort) and server.port: + return int(server.port) != 53 + elif isinstance(server, ForwardServerSchema): + return is_port_custom(server.address.to_std()) + return False + + def is_transport_tls(servers: List[Any]) -> bool: + for server in servers: + if isinstance(server, ForwardServerSchema): + return server.transport == "tls" + return False + + if self.options.authoritative and is_port_custom(self.servers): + raise ValueError("Forwarding to authoritative servers on a custom port is currently not supported.") + + if self.options.authoritative and is_transport_tls(self.servers): + raise ValueError("Forwarding to authoritative servers using TLS protocol is not supported.") diff --git a/manager/tests/unit/datamodel/test_forward_schema.py b/manager/tests/unit/datamodel/test_forward_schema.py new file mode 100644 index 000000000..9ae77fe86 --- /dev/null +++ b/manager/tests/unit/datamodel/test_forward_schema.py @@ -0,0 +1,57 @@ +import pytest +from pytest import raises + +from knot_resolver_manager.datamodel.forward_schema import ForwardSchema +from knot_resolver_manager.utils.modeling.exceptions import DataValidationError + + +@pytest.mark.parametrize("port,auth", [(5353, False), (53, True)]) +def test_forward_valid(port: int, auth: bool): + assert ForwardSchema( + {"subtree": ".", "options": {"authoritative": auth, "dnssec": True}, "servers": [f"127.0.0.1", "::1"]} + ) + assert ForwardSchema( + {"subtree": ".", "options": {"authoritative": auth, "dnssec": False}, "servers": [f"127.0.0.1@{port}", "::1"]} + ) + + assert ForwardSchema( + { + "subtree": ".", + "options": {"authoritative": auth, "dnssec": False}, + "servers": [{"address": [f"127.0.0.1@{port}", "::1"]}], + } + ) + + assert ForwardSchema( + { + "subtree": ".", + "options": {"authoritative": auth, "dnssec": False}, + "servers": [{"address": [f"127.0.0.1", "::1"]}], + } + ) + + +@pytest.mark.parametrize( + "port,auth,tls", + [(5353, True, False), (53, True, True)], +) +def test_forward_invalid(port: int, auth: bool, tls: bool): + + if not tls: + with raises(DataValidationError): + ForwardSchema( + { + "subtree": ".", + "options": {"authoritative": auth, "dnssec": False}, + "servers": [f"127.0.0.1@{port}", "::1"], + } + ) + + with raises(DataValidationError): + ForwardSchema( + { + "subtree": ".", + "options": {"authoritative": auth, "dnssec": False}, + "servers": [{"address": [f"127.0.0.1{port}", f"::1{port}"], "transport": "tls" if tls else None}], + } + ) -- 2.47.2