import pkgutil
-from typing import Any, Text, Union
+from typing import Text, Union
from jinja2 import Environment, Template
+from typing_extensions import Literal
from knot_resolver_manager.datamodel.dns64_config import Dns64
from knot_resolver_manager.datamodel.dnssec_config import Dnssec
class KresConfig(SchemaNode):
- server: Server = Server()
- options: Options = Options()
- network: Network = Network()
- dnssec: Union[bool, Dnssec] = True
- dns64: Union[bool, Dns64] = False
- lua: Lua = Lua()
-
- def _dnssec(self, obj: Any) -> Union[bool, Dnssec]:
- if "dnssec" not in obj or obj["dnssec"] is True:
+ class Raw(SchemaNode):
+ server: Server = Server()
+ options: Options = Options()
+ network: Network = Network()
+ dnssec: Union[bool, Dnssec] = True
+ dns64: Union[bool, Dns64] = False
+ lua: Lua = Lua()
+
+ _PREVIOUS_SCHEMA = Raw
+
+ server: Server
+ options: Options
+ network: Network
+ dnssec: Union[Literal[False], Dnssec]
+ dns64: Union[Literal[False], Dns64]
+ lua: Lua
+
+ def _dnssec(self, obj: Raw) -> Union[Literal[False], Dnssec]:
+ if obj.dnssec is True:
return Dnssec()
- return obj["dnssec"]
+ return obj.dnssec
- def _dns64(self, obj: Any) -> Union[bool, Dns64]:
- if "dns64" not in obj or obj["dns64"] is True:
+ def _dns64(self, obj: Raw) -> Union[Literal[False], Dns64]:
+ if obj.dns64 is True:
return Dns64()
- return obj["dns64"]
+ return obj.dns64
def render_lua(self) -> Text:
return _LUA_TEMPLATE.render(cfg=self)
def _validate(self) -> None:
if self.script and self.script_file:
- raise DataException("'lua.script' and 'lua.script-file' are both defined, only one can be used")
+ raise DataException("'script' and 'script-file' are both defined, only one can be used")
from typing import List
+from knot_resolver_manager.datamodel.types import Listen
from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
KindEnum = LiteralEnum["dns", "xdp", "dot", "doh"]
-class _Interface(SchemaNode):
- listen: str
+class Interface(SchemaNode):
+ listen: Listen
kind: KindEnum = "dns"
freebind: bool = False
-class Interface(SchemaNode):
- _PREVIOUS_SCHEMA = _Interface
-
- address: str
- port: int
- kind: str
- freebind: bool
-
- def _address(self, obj: _Interface) -> str:
- if "@" in obj.listen:
- address = obj.listen.split("@", maxsplit=1)[0]
- return address
- return obj.listen
-
- def _port(self, obj: _Interface) -> int:
- port_map = {"dns": 53, "xdp": 53, "dot": 853, "doh": 443}
- if "@" in obj.listen:
- port = obj.listen.split("@", maxsplit=1)[1]
- return int(port)
- return port_map.get(obj.kind, 0)
-
-
class Network(SchemaNode):
- interfaces: List[Interface] = [Interface({"listen": "127.0.0.1"}), Interface({"listen": "::1", "freebind": True})]
+ interfaces: List[Interface] = [
+ Interface({"listen": {"ip": "127.0.0.1", "port": 53}}),
+ Interface({"listen": {"ip": "::1", "port": 53}, "freebind": True}),
+ ]
from typing import Any, Union
+from typing_extensions import Literal
+
from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
class Options(SchemaNode):
- glue_checking: GlueCheckingEnum = "normal"
- qname_minimisation: bool = True
- query_loopback: bool = False
- reorder_rrset: bool = True
- query_case_randomization: bool = True
- query_priming: bool = True
- rebinding_protection: bool = False
- refuse_no_rd: bool = True
- time_jump_detection: bool = True
- violators_workarounds: bool = False
- serve_stale: bool = False
-
- prediction: Union[bool, Prediction] = False
-
- def _prediction(self, obj: Any) -> Union[bool, Prediction]:
- if obj["prediction"] is True:
+ class Raw(SchemaNode):
+ glue_checking: GlueCheckingEnum = "normal"
+ qname_minimisation: bool = True
+ query_loopback: bool = False
+ reorder_rrset: bool = True
+ query_case_randomization: bool = True
+ query_priming: bool = True
+ rebinding_protection: bool = False
+ refuse_no_rd: bool = True
+ time_jump_detection: bool = True
+ violators_workarounds: bool = False
+ serve_stale: bool = False
+ prediction: Union[bool, Prediction] = False
+
+ _PREVIOUS_SCHEMA = Raw
+
+ glue_checking: GlueCheckingEnum
+ qname_minimisation: bool
+ query_loopback: bool
+ reorder_rrset: bool
+ query_case_randomization: bool
+ query_priming: bool
+ rebinding_protection: bool
+ refuse_no_rd: bool
+ time_jump_detection: bool
+ violators_workarounds: bool
+ serve_stale: bool
+ prediction: Union[Literal[False], Prediction]
+
+ def _prediction(self, obj: Raw) -> Any:
+ if obj.prediction is True:
return Prediction()
- return obj["prediction"]
+ return obj.prediction
key_file: Optional[AnyPath] = None
-class _ServerRaw(SchemaNode):
- hostname: Optional[str] = None
- groupid: Optional[str] = None
- nsid: Optional[str]
- workers: Union[Literal["auto"], int] = 1
- use_cache_gc: bool = True
-
- management: Management = Management()
- webmgmt: Optional[Webmgmt] = None
-
-
class Server(SchemaNode):
- _PREVIOUS_SCHEMA = _ServerRaw
+ class Raw(SchemaNode):
+ hostname: Optional[str] = None
+ groupid: Optional[str] = None
+ nsid: Optional[str] = None
+ workers: Union[Literal["auto"], int] = 1
+ use_cache_gc: bool = True
+ management: Management = Management()
+ webmgmt: Optional[Webmgmt] = None
+
+ _PREVIOUS_SCHEMA = Raw
hostname: str
groupid: Optional[str]
nsid: Optional[str]
workers: int
use_cache_gc: bool
-
management: Management
webmgmt: Optional[Webmgmt]
- def _hostname(self, obj: Any) -> str:
- if isinstance(obj["hostname"], str):
- return obj["hostname"]
- elif obj["hostname"] is None:
+ def _hostname(self, obj: Raw) -> Any:
+ if obj.hostname is None:
return socket.gethostname()
- raise ValueError(f"Unexpected value for 'server.hostname': {obj['hostname']}")
+ return obj.hostname
- def _workers(self, obj: Any) -> int:
- if isinstance(obj["workers"], int):
- return obj["workers"]
- elif obj["workers"] == "auto":
+ def _workers(self, obj: Raw) -> Any:
+ if obj.workers == "auto":
return _cpu_count()
- raise ValueError(f"Unexpected value for 'server.workers': {obj['workers']}")
+ return obj.workers
def _validate(self) -> None:
if self.workers < 0:
if hasattr(self, f"_{name}"):
# check, that the schema makes sense
raise TypeError(
- f"{cls.__name__}.{name}: can't have both default value and transformation function at once. Use _PREVIOUS_SCHEMA..."
+ f"{cls.__name__}.{name}: can't have both default value and transformation function at once."
+ "Use _PREVIOUS_SCHEMA..."
)
return used_keys
--- /dev/null
+import ipaddress
+
+from knot_resolver_manager.datamodel.network_config import Network
+
+
+def test_interfaces_default():
+ o = Network()
+
+ assert len(o.interfaces) == 2
+ # {"listen": {"ip": "127.0.0.1", "port": 53}}
+ assert o.interfaces[0].listen.ip == ipaddress.ip_address("127.0.0.1")
+ assert o.interfaces[0].listen.port == 53
+ assert o.interfaces[0].kind == "dns"
+ assert o.interfaces[0].freebind == False
+ # {"listen": {"ip": "::1", "port": 53}, "freebind": True}
+ assert o.interfaces[1].listen.ip == ipaddress.ip_address("::1")
+ assert o.interfaces[1].listen.port == 53
+ assert o.interfaces[1].kind == "dns"
+ assert o.interfaces[1].freebind == True