from typing import List, Optional, Union
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, IPAddressPort
-from knot_resolver_manager.datamodel.view_schema import FlagsEnum
+from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, FlagsEnum, IPAddressPort
from knot_resolver_manager.utils import SchemaNode
class InterfaceSchema(SchemaNode):
+ class Raw(SchemaNode):
+ listen: Listen
+ kind: KindEnum = "dns"
+ freebind: bool = False
+
+ _PREVIOUS_SCHEMA = Raw
+
listen: Listen
- kind: KindEnum = "dns"
- freebind: bool = False
+ kind: KindEnum
+ freebind: bool
+
+ def _listen(self, origin: Raw):
+ if not origin.listen.port:
+ if origin.kind == "dot":
+ origin.listen.port = 853
+ elif origin.kind == "doh":
+ origin.listen.port = 443
+ else:
+ origin.listen.port = 53
+ return origin.listen
class EdnsBufferSizeSchema(SchemaNode):
from typing import List, Optional
from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
-from knot_resolver_manager.datamodel.types import ActionEnum, IPAddressPort, RecordTypeEnum, TimeUnit
-from knot_resolver_manager.datamodel.view_schema import FlagsEnum
+from knot_resolver_manager.datamodel.types import ActionEnum, FlagsEnum, IPAddressPort, RecordTypeEnum, TimeUnit
from knot_resolver_manager.utils import SchemaNode
from typing import List, Optional
from knot_resolver_manager.datamodel.policy_schema import ActionEnum
-from knot_resolver_manager.datamodel.types import CheckedPath
-from knot_resolver_manager.datamodel.view_schema import FlagsEnum
+from knot_resolver_manager.datamodel.types import CheckedPath, FlagsEnum
from knot_resolver_manager.utils import SchemaNode
from typing import List, Optional, Union
-from knot_resolver_manager.datamodel.types import IPAddressPort
-from knot_resolver_manager.datamodel.view_schema import FlagsEnum
+from knot_resolver_manager.datamodel.types import FlagsEnum, IPAddressPort
from knot_resolver_manager.utils import SchemaNode
{% macro net_listen(interface) -%}
-net.listen('{{ interface.listen.ip|string }}', {{ interface.listen.port|int }}, { kind='{{ 'tls' if interface.kind == 'dot' else interface.kind }}', freebind={{ 'true' if interface.freebind else 'false'}} })
+net.listen(
+{%- if interface.listen.ip -%}
+'{{ interface.listen.ip|string }}',
+{%- if interface.listen.port -%}
+{{ interface.listen.port|int }},
+{%- endif -%}
+{%- elif interface.listen.unix_socket -%}
+'{{ interface.listen.unix_socket|string }}',nil,
+{%- elif interface.listen.interface -%}
+net.{{ interface.listen.interface|string }},
+{%- if interface.listen.port -%}
+{{ interface.listen.port|int }},
+{%- endif -%}
+{%- endif -%}
+{kind='{{ 'tls' if interface.kind == 'dot' else interface.kind }}',freebind={{ 'true' if interface.freebind else 'false'}}})
{%- endmacro %}
\ No newline at end of file
"reqtrace",
]
+# FLAGS from https://knot-resolver.readthedocs.io/en/stable/lib.html?highlight=options#c.kr_qflags
+FlagsEnum = LiteralEnum[
+ "no-minimize",
+ "no-ipv4",
+ "no-ipv6",
+ "tcp",
+ "resolved",
+ "await-ipv4",
+ "await-ipv6",
+ "await-cut",
+ "no-edns",
+ "cached",
+ "no-cache",
+ "expiring",
+ "allow_local",
+ "dnssec-want",
+ "dnssec-bogus",
+ "dnssec-insecure",
+ "dnssec-cd",
+ "stub",
+ "always-cut",
+ "dnssec-wexpand",
+ "permissive",
+ "strict",
+ "badcookie-again",
+ "cname",
+ "reorder-rr",
+ "trace",
+ "no-0x20",
+ "dnssec-nods",
+ "dnssec-optout",
+ "nonauth",
+ "forward",
+ "dns64-mark",
+ "cache-tried",
+ "no-ns-found",
+ "pkt-is-sane",
+ "dns64-disable",
+]
+
# DNS record types from 'kres.type' table
RecordTypeEnum = LiteralEnum[
"A",
class ListenType(Enum):
- IP_AND_PORT = auto()
+ IP = auto()
UNIX_SOCKET = auto()
- INTERFACE_AND_PORT = auto()
+ INTERFACE = auto()
class Listen(SchemaNode, Serializable):
"interface" if origin.interface is not None else ...,
}
- if present == {"ip", "port", ...}:
- return ListenType.IP_AND_PORT
+ if present == {"ip", ...} or present == {"ip", "port", ...}:
+ return ListenType.IP
elif present == {"unix_socket", ...}:
return ListenType.UNIX_SOCKET
- elif present == {"interface", "port", ...}:
- return ListenType.INTERFACE_AND_PORT
+ elif present == {"interface", ...} or present == {"interface", "port", ...}:
+ return ListenType.INTERFACE
else:
raise ValueError(
"Listen configuration contains multiple incompatible options at once. "
pass
def __str__(self) -> str:
- if self.typ is ListenType.IP_AND_PORT:
+ if self.typ is ListenType.IP:
return f"{self.ip} @ {self.port}"
elif self.typ is ListenType.UNIX_SOCKET:
return f"{self.unix_socket}"
- elif self.typ is ListenType.INTERFACE_AND_PORT:
+ elif self.typ is ListenType.INTERFACE:
return f"{self.interface} @ {self.port}"
else:
raise NotImplementedError()
)
def to_dict(self) -> Dict[Any, Any]:
- if self.typ is ListenType.IP_AND_PORT:
+ if self.typ is ListenType.IP:
return {"port": self.port, "ip": str(self.ip)}
elif self.typ is ListenType.UNIX_SOCKET:
return {"unix_socket": str(self.unix_socket)}
- elif self.typ is ListenType.INTERFACE_AND_PORT:
+ elif self.typ is ListenType.INTERFACE:
return {"interface": self.interface, "port": self.port}
else:
raise NotImplementedError()
from typing import List, Optional
-from knot_resolver_manager.datamodel.types import IPNetwork
+from knot_resolver_manager.datamodel.types import FlagsEnum, IPNetwork
from knot_resolver_manager.utils import SchemaNode
-from knot_resolver_manager.utils.types import LiteralEnum
-
-# FLAGS from https://knot-resolver.readthedocs.io/en/stable/lib.html?highlight=options#c.kr_qflags
-FlagsEnum = LiteralEnum[
- "no-minimize",
- "no-ipv4",
- "no-ipv6",
- "tcp",
- "resolved",
- "await-ipv4",
- "await-ipv6",
- "await-cut",
- "no-edns",
- "cached",
- "no-cache",
- "expiring",
- "allow_local",
- "dnssec-want",
- "dnssec-bogus",
- "dnssec-insecure",
- "dnssec-cd",
- "stub",
- "always-cut",
- "dnssec-wexpand",
- "permissive",
- "strict",
- "badcookie-again",
- "cname",
- "reorder-rr",
- "trace",
- "no-0x20",
- "dnssec-nods",
- "dnssec-optout",
- "nonauth",
- "forward",
- "dns64-mark",
- "cache-tried",
- "no-ns-found",
- "pkt-is-sane",
- "dns64-disable",
-]
class ViewSchema(SchemaNode):
if mgn.listen.typ is ListenType.UNIX_SOCKET:
nsite = web.UnixSite(self.runner, str(mgn.listen.unix_socket))
logger.info(f"Starting API HTTP server on http+unix://{mgn.listen.unix_socket}")
- elif mgn.listen.typ is ListenType.IP_AND_PORT:
+ elif mgn.listen.typ is ListenType.IP:
nsite = web.TCPSite(self.runner, str(mgn.listen.ip), mgn.listen.port)
logger.info(f"Starting API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
else:
if self.listen is not None and self.site is not None:
if self.listen.typ is ListenType.UNIX_SOCKET:
logger.info(f"Stopping API HTTP server on http+unix://{mgn.listen.unix_socket}")
- elif mgn.listen.typ is ListenType.IP_AND_PORT:
+ elif mgn.listen.typ is ListenType.IP:
logger.info(f"Stopping API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
await self.site.stop()
o = Listen({"interface": "eth0", "port": 56})
- assert o.typ == ListenType.INTERFACE_AND_PORT
+ assert o.typ == ListenType.INTERFACE
assert o.ip is None
assert o.port == 56
assert o.unix_socket is None
assert o.interface == "eth0"
- o = Listen({"ip": "123.4.5.6", "port": 56})
+ o = Listen({"ip": "123.4.5.6"})
- assert o.typ == ListenType.IP_AND_PORT
+ assert o.typ == ListenType.IP
assert o.ip == IPv4Address("123.4.5.6")
- assert o.port == 56
+ assert o.port == None
assert o.unix_socket is None
assert o.interface is None
# check failure
with raises(KresdManagerException):
Listen({"unix-socket": "/tmp", "ip": "127.0.0.1"})
+ with raises(KresdManagerException):
+ Listen({"unix-socket": "/tmp", "port": 853})
def test_network():