workers: 1
rundir: etc/knot-resolver/runtime
management:
- listen:
- ip: 127.0.0.1
- port: 5000
\ No newline at end of file
+ ip-address: 127.0.0.1@5000
\ No newline at end of file
from typing import List, Optional, Union
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, FlagsEnum, IPAddressPort
+from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, FlagsEnum, IPAddressOptionalPort
from knot_resolver_manager.utils import SchemaNode
class ForwardServerSchema(SchemaNode):
- address: IPAddressPort
+ address: IPAddressOptionalPort
pin_sha256: Optional[Union[str, List[str]]] = None
hostname: Optional[DomainName] = None
ca_file: Optional[CheckedPath] = None
class ForwardZoneSchema(SchemaNode):
tls: bool = False
- servers: Union[List[IPAddressPort], List[ForwardServerSchema]]
+ servers: Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]
views: Optional[List[str]] = None
options: Optional[List[FlagsEnum]] = None
-import os
from typing import List, Optional, Union
from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import (
CheckedPath,
- InterfacePort,
+ InterfaceOptionalPort,
IPAddress,
- IPAddressPort,
+ IPAddressOptionalPort,
IPNetwork,
IPv4Address,
IPv6Address,
class ListenSchema(SchemaNode):
class Raw(SchemaNode):
unix_socket: Union[None, CheckedPath, List[CheckedPath]] = None
- ip_address: Union[None, IPAddressPort, List[IPAddressPort]] = None
- interface: Union[None, InterfacePort, List[InterfacePort]] = None
+ ip_address: Union[None, IPAddressOptionalPort, List[IPAddressOptionalPort]] = None
+ interface: Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]] = None
port: Optional[PortNumber] = None
kind: KindEnum = "dns"
freebind: bool = False
_PREVIOUS_SCHEMA = Raw
unix_socket: Union[None, CheckedPath, List[CheckedPath]]
- ip_address: Union[None, IPAddressPort, List[IPAddressPort]]
- interface: Union[None, InterfacePort, List[InterfacePort]]
+ ip_address: Union[None, IPAddressOptionalPort, List[IPAddressOptionalPort]]
+ interface: Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]]
port: Optional[PortNumber]
kind: KindEnum
freebind: bool
- def _ip_address(self, origin: Raw) -> Union[None, IPAddressPort, List[IPAddressPort]]:
+ def _ip_address(self, origin: Raw) -> Union[None, IPAddressOptionalPort, List[IPAddressOptionalPort]]:
if isinstance(origin.ip_address, list):
port_set: Optional[bool] = None
for addr in origin.ip_address:
raise ValueError("The port number is defined in two places ('port' option and '@<port>' syntax).")
if port_set is not None and (bool(addr.port) != port_set):
raise ValueError(
- "The port number specified by '@<port>' syntax must or must not be used for each IP address."
+ "The '@<port>' syntax must be used either for all or none of the IP addresses in the list."
)
port_set = True if addr.port else False
- elif isinstance(origin.ip_address, IPAddressPort) and origin.ip_address.port and origin.port:
+ elif isinstance(origin.ip_address, IPAddressOptionalPort) and origin.ip_address.port and origin.port:
raise ValueError("The port number is defined in two places ('port' option and '@<port>' syntax).")
return origin.ip_address
- def _interface(self, origin: Raw) -> Union[None, InterfacePort, List[InterfacePort]]:
+ def _interface(self, origin: Raw) -> Union[None, InterfaceOptionalPort, List[InterfaceOptionalPort]]:
if isinstance(origin.interface, list):
port_set: Optional[bool] = None
for intrfc in origin.interface:
raise ValueError("The port number is defined in two places ('port' option and '@<port>' syntax).")
if port_set is not None and (bool(intrfc.port) != port_set):
raise ValueError(
- "The port number specified by '@<port>' syntax must or must not be used for each interface."
+ "The '@<port>' syntax must be used either for all or none of the interface in the list."
)
port_set = True if intrfc.port else False
- elif isinstance(origin.interface, InterfacePort) and origin.interface.port and origin.port:
+ elif isinstance(origin.interface, InterfaceOptionalPort) and origin.interface.port and origin.port:
raise ValueError("The port number is defined in two places ('port' option and '@<port>' syntax).")
return origin.interface
from typing import List, Optional
from knot_resolver_manager.datamodel.network_schema import AddressRenumberingSchema
-from knot_resolver_manager.datamodel.types import ActionEnum, FlagsEnum, IPAddressPort, RecordTypeEnum, TimeUnit
+from knot_resolver_manager.datamodel.types import ActionEnum, FlagsEnum, IPAddressOptionalPort, RecordTypeEnum, TimeUnit
from knot_resolver_manager.utils import SchemaNode
message: Optional[str] = None
reroute: Optional[List[AddressRenumberingSchema]] = None
answer: Optional[AnswerSchema] = None
- mirror: Optional[List[IPAddressPort]] = None
+ mirror: Optional[List[IPAddressOptionalPort]] = None
def _validate(self) -> None:
# checking for missing fields
from typing_extensions import Literal
-from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, RecordTypeEnum, UncheckedPath
+from knot_resolver_manager.datamodel.types import (
+ CheckedPath,
+ DomainName,
+ InterfacePort,
+ IPAddressPort,
+ RecordTypeEnum,
+ UncheckedPath,
+)
from knot_resolver_manager.exceptions import DataException
from knot_resolver_manager.utils import SchemaNode
class ManagementSchema(SchemaNode):
- """
- Management API configuration.
-
- ---
- listen: Specifies where does the manager listen with its API. Can't be changed in runtime!
- """
+ unix_socket: Optional[CheckedPath] = None
+ ip_address: Optional[IPAddressPort] = None
- listen: Listen = Listen({"unix-socket": "./manager.sock"})
+ def _validate(self) -> None:
+ if bool(self.unix_socket) == bool(self.ip_address):
+ raise ValueError("One of 'ip-address' or 'unix-socket' must be configured..")
class WebmgmtSchema(SchemaNode):
- listen: Listen
+ unix_socket: Optional[CheckedPath] = None
+ ip_address: Optional[IPAddressPort] = None
+ interface: Optional[InterfacePort] = None
tls: bool = False
cert_file: Optional[CheckedPath] = None
key_file: Optional[CheckedPath] = None
+ def _validate(self) -> None:
+ present = {
+ "ip_address" if self.ip_address is not None else ...,
+ "unix_socket" if self.unix_socket is not None else ...,
+ "interface" if self.interface is not None else ...,
+ }
+ if not (present == {"ip_address", ...} or present == {"unix_socket", ...} or present == {"interface", ...}):
+ raise ValueError(
+ "Listen configuration contains multiple incompatible options at once. "
+ "One of 'ip-address', 'interface' or 'unix-socket' must be configured."
+ )
+
class ServerSchema(SchemaNode):
"""
backend: BackendEnum = "auto"
watchdog: Union[bool, WatchDogSchema] = True
rundir: UncheckedPath = UncheckedPath(".")
- management: ManagementSchema = ManagementSchema()
+ management: ManagementSchema = ManagementSchema({"unix-socket": "./manager.sock"})
webmgmt: Optional[WebmgmtSchema] = None
_PREVIOUS_SCHEMA = Raw
from typing import List, Optional, Union
-from knot_resolver_manager.datamodel.types import FlagsEnum, IPAddressPort
+from knot_resolver_manager.datamodel.types import FlagsEnum, IPAddressOptionalPort
from knot_resolver_manager.utils import SchemaNode
class StubServerSchema(SchemaNode):
- address: IPAddressPort
+ address: IPAddressOptionalPort
class StubZoneSchema(SchemaNode):
- servers: Union[List[IPAddressPort], List[StubServerSchema]]
+ servers: Union[List[IPAddressOptionalPort], List[StubServerSchema]]
views: Optional[List[str]] = None
options: Optional[List[FlagsEnum]] = None
{%- endmacro %}
-{% macro net_listen_unix_socket(socket, kind, freebind) -%}
-net.listen('{{ socket }}',nil,{kind={{ listen_kind(kind) }},freebind={{ 'true' if freebind else 'false'}}})
+{% macro net_listen_unix_socket(path, kind, freebind) -%}
+net.listen('{{ path }}',nil,{kind={{ listen_kind(kind) }},freebind={{ 'true' if freebind else 'false'}}})
{%- endmacro %}
{% macro net_listen_interface(interface, kind, freebind, port) -%}
-net.listen({{ listen_interface(interface.intrfc) }},
+net.listen({{ listen_interface(interface.if_name) }},
{%- if interface.port -%}
{{ interface.port }},
{%- else -%}
{% macro network_listen(listen) -%}
{%- if listen.unix_socket -%}
{%- if listen.unix_socket is iterable-%}
- {% for socket in listen.unix_socket -%}
- {{ net_listen_unix_socket(socket, listen.kind, listen.freebind) }}
+ {% for path in listen.unix_socket -%}
+ {{ net_listen_unix_socket(path, listen.kind, listen.freebind) }}
{% endfor -%}
{%- else -%}
{{ net_listen_unix_socket(listen.unix_socket, listen.kind, listen.freebind) }}
modules.unload('watchdog')
{%- endif %}
-{% if cfg.server.webmgmt %}
+{% if cfg.server.webmgmt -%}
-- server.webmgmt
modules.load('http')
http.config({
{{ "key = '"+cfg.server.webmgmt.key_file+"'," if cfg.server.webmgmt.key_file }}
}, 'webmgmt')
net.listen(
-{% if cfg.server.webmgmt.listen.ip %}
- '{{ cfg.server.webmgmt.listen.ip }}',
-{% elif cfg.server.webmgmt.listen.unix_socket %}
- '{{ cfg.server.webmgmt.listen.unix_socket }}',
-{% elif cfg.server.webmgmt.listen.interface %}
- net.{{ cfg.server.webmgmt.listen.interface }},
-{% endif %}
- {{ cfg.server.webmgmt.listen.port|string if cfg.server.webmgmt.listen.port else 'nil' }},
- { kind = 'webmgmt' })
-{% endif %}
\ No newline at end of file
+{%- if cfg.server.webmgmt.unix_socket -%}
+ '{{ cfg.server.webmgmt.unix_socket }}',nil,
+{%- elif cfg.server.webmgmt.ip_address -%}
+ '{{ cfg.server.webmgmt.ip_address.addr }}',{{ cfg.server.webmgmt.ip_address.port }},
+{%- elif cfg.server.webmgmt.interface -%}
+ net.{{ cfg.server.webmgmt.interface.if_name }},{{ cfg.server.webmgmt.interface.port }},
+{%- endif -%}
+{ kind = 'webmgmt' })
+{%- endif %}
\ No newline at end of file
import ipaddress
import logging
-import os
import re
from enum import Enum, auto
from pathlib import Path
]
-class IntRange(CustomValueType):
+class _IntCustomBase(CustomValueType):
+ """
+ Base class to work with integer value that is intended as a basis for other custom types.
+ """
+
_value: int
+
+ def __int__(self) -> int:
+ return self._value
+
+ def __str__(self) -> str:
+ return str(self._value)
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, _IntCustomBase) and o._value == self._value
+
+ def serialize(self) -> Any:
+ return self._value
+
+ @classmethod
+ def json_schema(cls: Type["_IntCustomBase"]) -> Dict[Any, Any]:
+ return {"type": "integer"}
+
+
+class _StrCustomBase(CustomValueType):
+ """
+ Base class to work with string value that is intended as a basis for other custom types.
+ """
+
+ _value: str
+
+ def __str__(self) -> str:
+ return self._value
+
+ def to_std(self) -> str:
+ return self._value
+
+ def __hash__(self) -> int:
+ return hash(self._value)
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, _StrCustomBase) and o._value == self._value
+
+ def serialize(self) -> Any:
+ return self._value
+
+ @classmethod
+ def json_schema(cls: Type["_StrCustomBase"]) -> Dict[Any, Any]:
+ return {"type": "string"}
+
+
+class _IntRangeBase(_IntCustomBase):
+ """
+ Base class to work with integer value ranges that is intended as a basis for other custom types.
+ Just inherit the class and set the values for _min and _max.
+
+ class CustomIntRange(_IntRangeBase):
+ _min: int = 0
+ _max: int = 10_000
+ """
+
_min: int
_max: int
object_path,
)
- def __int__(self) -> int:
- return self._value
+ @classmethod
+ def json_schema(cls: Type["_IntRangeBase"]) -> Dict[Any, Any]:
+ return {"type": "integer", "minimum": cls._min, "maximum": cls._max}
- def __str__(self) -> str:
- return str(self._value)
- def __eq__(self, o: object) -> bool:
- return isinstance(o, IntRange) and o._value == self._value
+class _PatternBase(_StrCustomBase):
+ """
+ Base class to work with string value that match regex pattern.
+ Just inherit the class and set pattern for _re.
- def serialize(self) -> Any:
- return self._value
+ class ABPattern(_PatternBase):
+ _re: Pattern[str] = r"ab*"
+ """
+
+ _re: Pattern[str]
+
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
+ super().__init__(source_value)
+ if isinstance(source_value, str):
+ if type(self)._re.match(source_value):
+ self._value: str = source_value
+ else:
+ raise SchemaException(f"'{source_value}' does not match '{self._re.pattern}' pattern", object_path)
+ else:
+ raise SchemaException(
+ f"Unexpected input type for string pattern - {type(source_value)}."
+ "Cause might be invalid format or invalid type.",
+ object_path,
+ )
@classmethod
- def json_schema(cls: Type["IntRange"]) -> Dict[Any, Any]:
- return {"type": "integer", "minimum": cls._min, "maximum": cls._max}
+ def json_schema(cls: Type["_PatternBase"]) -> Dict[Any, Any]:
+ return {"type": "string", "pattern": rf"{cls._re.pattern}"}
-class PortNumber(IntRange):
+class PortNumber(_IntRangeBase):
_min: int = 1
_max: int = 65_535
raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
-class DomainName(CustomValueType):
+class DomainName(_PatternBase):
_re = re.compile(
r"^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|"
r"([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|"
r"([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})($|.$)"
)
+
+class InterfacePort(_StrCustomBase):
+ if_name: str
+ port: PortNumber
+
def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
if isinstance(source_value, str):
- if type(self)._re.match(source_value):
- self._value: str = source_value
+ if "@" in source_value:
+ parts = source_value.split("@", 1)
+ try:
+ self.port = PortNumber(int(parts[1]))
+ except ValueError as e:
+ raise SchemaException("Failed to parse port.", object_path) from e
+ self.if_name = parts[0]
else:
- raise SchemaException(f"'{source_value}' is not valid domain name", object_path)
+ raise SchemaException("Missing port number '<interface>@<port>'.", object_path)
+ self._value = source_value
else:
raise SchemaException(
- f"Unexpected input type for DomainName type - {type(source_value)}."
- "Cause might be invalid format or invalid type.",
+ f"Unexpected value for '<interface>@<port>'. Expected string, got '{source_value}'"
+ f" with type '{type(source_value)}'",
object_path,
)
- def to_std(self) -> str:
- return self._value
-
- def __hash__(self) -> int:
- return hash(self._value)
-
- def __str__(self) -> str:
- return self._value
-
- def __int__(self) -> int:
- raise ValueError("Can't convert DomainName to an integer")
-
- def __eq__(self, o: object) -> bool:
- """
- Two instances of DomainName are equal when they represent same string.
- """
- return isinstance(o, DomainName) and str(o._value) == str(self._value)
-
- def serialize(self) -> Any:
- return str(self._value)
- @classmethod
- def json_schema(cls: Type["DomainName"]) -> Dict[Any, Any]:
- return {
- "type": "string",
- }
-
-
-class InterfacePort(CustomValueType):
- intrfc: str
+class InterfaceOptionalPort(_StrCustomBase):
+ if_name: str
port: Optional[PortNumber] = None
def __init__(self, source_value: Any, object_path: str = "/") -> None:
super().__init__(source_value)
if isinstance(source_value, str):
if "@" in source_value:
- sep = source_value.split("@", 1)
+ parts = source_value.split("@", 1)
try:
- self.port = PortNumber(int(sep[1]))
+ self.port = PortNumber(int(parts[1]))
except ValueError as e:
raise SchemaException("Failed to parse port.", object_path) from e
- self.intrfc = sep[0]
+ self.if_name = parts[0]
else:
- self.intrfc = source_value
+ self.if_name = source_value
self._value = source_value
-
else:
raise SchemaException(
f"Unexpected value for a '<interface>[@<port>]'. Expected string, got '{source_value}'"
object_path,
)
- def to_std(self) -> str:
- return self._value
- def __str__(self) -> str:
- return self._value
-
- def __int__(self) -> int:
- raise ValueError("Can't convert InterfacePort to an integer")
+class IPAddressPort(_StrCustomBase):
+ addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
+ port: PortNumber
- def __eq__(self, o: object) -> bool:
- """
- Two instances of InterfacePort are equal when they represent same string.
- """
- return isinstance(o, InterfacePort) and str(o._value) == str(self._value)
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
+ super().__init__(source_value)
+ if isinstance(source_value, str):
+ if "@" in source_value:
+ parts = source_value.split("@", 1)
+ try:
+ self.port = PortNumber(int(parts[1]))
+ except ValueError as e:
+ raise SchemaException("Failed to parse port.", object_path) from e
- def serialize(self) -> Any:
- return str(self._value)
+ try:
+ self.addr = ipaddress.ip_address(parts[0])
+ except ValueError as e:
+ raise SchemaException("Failed to parse IP address.", object_path) from e
+ else:
+ raise SchemaException("Missing port number '<ip-address>@<port>'.", object_path)
+ self._value = source_value
- @classmethod
- def json_schema(cls: Type["InterfacePort"]) -> Dict[Any, Any]:
- return {
- "type": "string",
- }
+ else:
+ raise SchemaException(
+ f"Unexpected value for a '<ip-address>@<port>'. Expected string, got '{source_value}'"
+ f" with type '{type(source_value)}'",
+ object_path,
+ )
-class IPAddressPort(CustomValueType):
+class IPAddressOptionalPort(_StrCustomBase):
addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
port: Optional[PortNumber] = None
def __init__(self, source_value: Any, object_path: str = "/") -> None:
- super().__init__(source_value)
if isinstance(source_value, str):
if "@" in source_value:
- sep = source_value.split("@", 1)
+ parts = source_value.split("@", 1)
try:
- self.port = PortNumber(int(sep[1]))
+ self.port: Optional[PortNumber] = PortNumber(int(parts[1]))
except ValueError as e:
raise SchemaException("Failed to parse port.", object_path) from e
try:
- self.addr = ipaddress.ip_address(sep[0])
+ self.addr = ipaddress.ip_address(parts[0])
except ValueError as e:
raise SchemaException("Failed to parse IP address.", object_path) from e
else:
object_path,
)
- def to_std(self) -> str:
- return self._value
-
- def __str__(self) -> str:
- return self._value
-
- def __int__(self) -> int:
- raise ValueError("Can't convert IP address to an integer")
-
- def __eq__(self, o: object) -> bool:
- """
- Two instances of IPAddressPORT are equal when they represent same string.
- """
- return isinstance(o, IPAddressPort) and str(o._value) == str(self._value)
-
- def serialize(self) -> Any:
- return str(self._value)
-
- @classmethod
- def json_schema(cls: Type["IPAddressPort"]) -> Dict[Any, Any]:
- return {
- "type": "string",
- }
-
class IPv4Address(CustomValueType):
def __init__(self, source_value: Any, object_path: str = "/") -> None:
@classmethod
def json_schema(cls: Type["IPv6Network96"]) -> Dict[Any, Any]:
return {"type": "string"}
-
-
-class ListenType(Enum):
- IP = auto()
- UNIX_SOCKET = auto()
- INTERFACE = auto()
-
-
-class Listen(SchemaNode, Serializable):
- class Raw(SchemaNode):
- ip: Optional[IPAddress] = None
- port: Optional[int] = None
- unix_socket: Optional[CheckedPath] = None
- interface: Optional[str] = None
-
- _PREVIOUS_SCHEMA = Raw
-
- typ: ListenType
- ip: Optional[IPAddress]
- port: Optional[int]
- unix_socket: Optional[CheckedPath]
- interface: Optional[str]
-
- def _typ(self, origin: Raw) -> ListenType:
- present = {
- "ip" if origin.ip is not None else ...,
- "port" if origin.port is not None else ...,
- "unix_socket" if origin.unix_socket is not None else ...,
- "interface" if origin.interface is not None else ...,
- }
-
- if present == {"ip", ...} or present == {"ip", "port", ...}:
- return ListenType.IP
- elif present == {"unix_socket", ...}:
- return ListenType.UNIX_SOCKET
- elif present == {"interface", ...} or present == {"interface", "port", ...}:
- return ListenType.INTERFACE
- else:
- raise ValueError(
- "Listen configuration contains multiple incompatible options at once. "
- "You can use (IP and PORT) or (UNIX_SOCKET) or (INTERFACE and PORT)."
- )
-
- def _port(self, origin: Raw) -> Optional[int]:
- if origin.port is None:
- return None
- if not 0 <= origin.port <= 65_535:
- raise ValueError(f"Port value {origin.port} out of range of usual 2-byte port value")
- return origin.port
-
- def _validate(self) -> None:
- # we already check that it's there is only one option in the `_typ` method
- pass
-
- def __str__(self) -> str:
- if self.typ is ListenType.IP:
- return f"{self.ip} @ {self.port}"
- elif self.typ is ListenType.UNIX_SOCKET:
- return f"{self.unix_socket}"
- elif self.typ is ListenType.INTERFACE:
- return f"{self.interface} @ {self.port}"
- else:
- raise NotImplementedError()
-
- def __eq__(self, o: object) -> bool:
- if not isinstance(o, Listen):
- return False
-
- return (
- self.port == o.port
- and self.ip == o.ip
- and self.typ == o.typ
- and self.unix_socket == o.unix_socket
- and self.interface == o.interface
- )
-
- def to_dict(self) -> Dict[Any, Any]:
- if self.typ is ListenType.IP:
- return {"port": self.port, "ip": str(self.ip)}
- elif self.typ is ListenType.UNIX_SOCKET:
- return {"unix_socket": str(self.unix_socket)}
- elif self.typ is ListenType.INTERFACE:
- return {"interface": self.interface, "port": self.port}
- else:
- raise NotImplementedError()
from knot_resolver_manager.config_store import ConfigStore
from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE
from knot_resolver_manager.datamodel.config_schema import KresConfig
-from knot_resolver_manager.datamodel.types import Listen, ListenType
+from knot_resolver_manager.datamodel.server_schema import ManagementSchema
from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException, TreeException
from knot_resolver_manager.kresd_controller import get_controller_by_name
from knot_resolver_manager.kresd_controller.interface import SubprocessController
# HTTP server
self.app = Application(middlewares=[error_handler])
self.runner = AppRunner(self.app)
- self.listen: Optional[Listen] = None
+ self.listen: Optional[ManagementSchema] = None
self.site: Union[NoneType, TCPSite, UnixSite] = None
self.listen_lock = asyncio.Lock()
self._config_path: Optional[Path] = config_path
await self._reconfigure_listen_address(config)
async def _deny_listen_address_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
- if config_old.server.management.listen != config_new.server.management.listen:
+ if config_old.server.management != config_new.server.management:
return Result.err(
"Changing API listen address dynamically is not allowed as it's really dangerous. If you"
" really need this feature, please contact the developers and explain why. Technically,"
mgn = config.server.management
# if the listen address did not change, do nothing
- if self.listen == mgn.listen:
+ if self.listen == mgn:
return
# start the new listen address
nsite: Union[web.TCPSite, web.UnixSite]
- if mgn.listen.typ is ListenType.UNIX_SOCKET:
- nsite = web.UnixSite(self.runner, str(mgn.listen.unix_socket))
- logger.info(f"Starting API HTTP server on http+unix://{mgn.listen.unix_socket}")
- elif mgn.listen.typ is ListenType.IP:
- nsite = web.TCPSite(self.runner, str(mgn.listen.ip), mgn.listen.port)
- logger.info(f"Starting API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
+ if mgn.unix_socket:
+ nsite = web.UnixSite(self.runner, str(mgn.unix_socket))
+ logger.info(f"Starting API HTTP server on http+unix://{mgn.unix_socket}")
+ elif mgn.ip_address:
+ nsite = web.TCPSite(self.runner, str(mgn.ip_address.addr), int(mgn.ip_address.port))
+ logger.info(f"Starting API HTTP server on http://{mgn.ip_address.addr}:{mgn.ip_address.port}")
else:
- raise KresManagerException(f"Requested API on unsupported configuration format {mgn.listen.typ}")
+ raise KresManagerException(f"Requested API on unsupported configuration format.")
await nsite.start()
# stop the old listen
assert (self.listen is None) == (self.site is None)
if self.listen is not None and self.site is not None:
- if self.listen.typ is ListenType.UNIX_SOCKET:
- logger.info(f"Stopping API HTTP server on http+unix://{mgn.listen.unix_socket}")
- elif mgn.listen.typ is ListenType.IP:
- logger.info(f"Stopping API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
+ if self.listen.unix_socket:
+ logger.info(f"Stopping API HTTP server on http+unix://{mgn.unix_socket}")
+ elif self.listen.ip_address:
+ logger.info(
+ f"Stopping API HTTP server on http://{self.listen.ip_address.addr}:{self.listen.ip_address.port}"
+ )
await self.site.stop()
# save new state
- self.listen = mgn.listen
+ self.listen = mgn
self.site = nsite
async def shutdown(self) -> None:
workers: 1
rundir: tests/integration/run
management:
- listen:
- ip: 127.0.0.1
- port: 5001
+ ip-address: 127.0.0.1@5001
cache:
storage: cache
logging:
from knot_resolver_manager.datamodel.config_schema import template_from_str
-from knot_resolver_manager.datamodel.types import IPAddressPort
+from knot_resolver_manager.datamodel.types import IPAddressOptionalPort
def test_view_tsig():
def test_view_addr():
- addr: IPAddressPort = IPAddressPort("10.0.0.1")
+ addr: IPAddressOptionalPort = IPAddressOptionalPort("10.0.0.1")
rule = "policy.all(policy.DENY)"
tmpl_str = """{% from 'macros/view_macros.lua.j2' import view_addr %}
{{ view_addr(addr, rule) }}"""
from knot_resolver_manager.datamodel.types import (
CheckedPath,
DomainName,
+ InterfaceOptionalPort,
+ InterfacePort,
IPAddress,
+ IPAddressOptionalPort,
IPAddressPort,
IPNetwork,
IPv4Address,
IPv6Address,
IPv6Network96,
- Listen,
- ListenType,
SizeUnit,
TimeUnit,
)
TestSchema({"name": "b@d.domain.com."})
-def test_ipaddress_port():
+def test_interface_port():
+ class TestSchema(SchemaNode):
+ interface: InterfacePort
+
+ o = TestSchema({"interface": "lo@5353"})
+ assert str(o.interface) == "lo@5353"
+ assert o.interface == InterfacePort("lo@5353")
+
+ with raises(KresManagerException):
+ TestSchema({"interface": "lo"})
+ with raises(KresManagerException):
+ TestSchema({"interface": "lo@"})
+ with raises(KresManagerException):
+ TestSchema({"interface": "lo@-1"})
+ with raises(KresManagerException):
+ TestSchema({"interface": "lo@65536"})
+
+
+def test_interface_optional_port():
+ class TestSchema(SchemaNode):
+ interface: InterfaceOptionalPort
+
+ o = TestSchema({"interface": "lo"})
+ assert str(o.interface) == "lo"
+ assert o.interface == InterfaceOptionalPort("lo")
+
+ o = TestSchema({"interface": "lo@5353"})
+ assert str(o.interface) == "lo@5353"
+ assert o.interface == InterfaceOptionalPort("lo@5353")
+
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "lo@"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "lo@-1"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "lo@65536"})
+
+
+def test_ip_address_port():
class TestSchema(SchemaNode):
ip_port: IPAddressPort
+ o = TestSchema({"ip-port": "123.4.5.6@5353"})
+ assert str(o.ip_port) == "123.4.5.6@5353"
+ assert o.ip_port == IPAddressOptionalPort("123.4.5.6@5353")
+
+ o = TestSchema({"ip-port": "2001:db8::1000@53"})
+ assert str(o.ip_port) == "2001:db8::1000@53"
+ assert o.ip_port == IPAddressOptionalPort("2001:db8::1000@53")
+
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "123.4.5.6"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "2001:db8::1000"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "123.4.5.6.7@5000"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "2001:db8::10000@5001"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "123.4.5.6@"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "123.4.5.6@-1"})
+ with raises(KresManagerException):
+ TestSchema({"ip-port": "123.4.5.6@65536"})
+
+
+def test_ip_address_optional_port():
+ class TestSchema(SchemaNode):
+ ip_port: IPAddressOptionalPort
+
o = TestSchema({"ip-port": "123.4.5.6"})
assert str(o.ip_port) == "123.4.5.6"
- assert o.ip_port == IPAddressPort("123.4.5.6")
+ assert o.ip_port == IPAddressOptionalPort("123.4.5.6")
o = TestSchema({"ip-port": "123.4.5.6@5353"})
assert str(o.ip_port) == "123.4.5.6@5353"
- assert o.ip_port == IPAddressPort("123.4.5.6@5353")
+ assert o.ip_port == IPAddressOptionalPort("123.4.5.6@5353")
o = TestSchema({"ip-port": "2001:db8::1000"})
assert str(o.ip_port) == "2001:db8::1000"
- assert o.ip_port == IPAddressPort("2001:db8::1000")
+ assert o.ip_port == IPAddressOptionalPort("2001:db8::1000")
o = TestSchema({"ip-port": "2001:db8::1000@53"})
assert str(o.ip_port) == "2001:db8::1000@53"
- assert o.ip_port == IPAddressPort("2001:db8::1000@53")
+ assert o.ip_port == IPAddressOptionalPort("2001:db8::1000@53")
with raises(KresManagerException):
TestSchema({"ip-port": "123.4.5.6.7"})
TestSchema({"ip-port": "123.4.5.6@65536"})
-def test_ipaddress():
+def test_ip_address():
class TestSchema(SchemaNode):
ip: IPAddress
TestSchema({"ip": "123456"})
-def test_listen():
- o = Listen({"unix-socket": "/tmp"})
-
- assert o.typ == ListenType.UNIX_SOCKET
- assert o.ip is None
- assert o.port is None
- assert o.unix_socket is not None
- assert o.interface is None
-
- o = Listen({"interface": "eth0", "port": 56})
-
- assert o.typ == ListenType.INTERFACE
- assert o.ip is None
- assert o.port == 56
- assert o.unix_socket is None
- assert o.interface == "eth0"
-
- o = Listen({"ip": "123.4.5.6"})
-
- assert o.typ == ListenType.IP
- assert o.ip == IPv4Address("123.4.5.6")
- assert o.port == None
- assert o.unix_socket is None
- assert o.interface is None
-
- # check failure
- with raises(KresManagerException):
- Listen({"unix-socket": "/tmp", "ip": "127.0.0.1"})
- with raises(KresManagerException):
- Listen({"unix-socket": "/tmp", "port": 853})
-
-
def test_network():
o = IPNetwork("10.11.12.0/24")
assert o.to_std().prefixlen == 24
from pytest import raises
from knot_resolver_manager.datamodel.network_schema import ListenSchema, NetworkSchema
-from knot_resolver_manager.datamodel.types import IPAddressPort, PortNumber
+from knot_resolver_manager.datamodel.types import IPAddressOptionalPort, PortNumber
from knot_resolver_manager.exceptions import KresManagerException
assert len(o.listen) == 2
# {"ip-address": "127.0.0.1"}
- assert o.listen[0].ip_address == IPAddressPort("127.0.0.1")
+ assert o.listen[0].ip_address == IPAddressOptionalPort("127.0.0.1")
assert o.listen[0].port == PortNumber(53)
assert o.listen[0].kind == "dns"
assert o.listen[0].freebind == False
# {"ip-address": "::1", "freebind": True}
- assert o.listen[1].ip_address == IPAddressPort("::1")
+ assert o.listen[1].ip_address == IPAddressOptionalPort("::1")
assert o.listen[1].port == PortNumber(53)
assert o.listen[1].kind == "dns"
assert o.listen[1].freebind == True
assert doh2.port == PortNumber(443)
-def test_listen_unix_socket():
+def test_listen_unix_socket_valid():
assert ListenSchema({"unix-socket": "/tmp/kresd-socket"})
assert ListenSchema({"unix-socket": ["/tmp/kresd-socket", "/tmp/kresd-socket2"]})
+
+def test_listen_unix_socket_invalid():
with raises(KresManagerException):
ListenSchema({"ip-address": "::1", "unix-socket": "/tmp/kresd-socket"})
with raises(KresManagerException):
ListenSchema({"unit-socket": "/tmp/kresd-socket", "port": "53"})
-def test_listen_ip_address():
+def test_listen_ip_address_valid():
assert ListenSchema({"ip-address": "::1"})
assert ListenSchema({"ip-address": "::1@5353"})
assert ListenSchema({"ip-address": "::1", "port": 5353})
assert ListenSchema({"ip-address": ["127.0.0.1@5353", "::1@5353"]})
assert ListenSchema({"ip-address": ["127.0.0.1", "::1"], "port": 5353})
+
+def test_listen_ip_address_invalid():
with raises(KresManagerException):
ListenSchema({"ip-address": "::1@5353", "port": 5353})
with raises(KresManagerException):
ListenSchema({"ip-address": ["127.0.0.1@5353", "::1@5353"], "port": 5353})
-def test_listen_interface():
+def test_listen_interface_valid():
assert ListenSchema({"interface": "lo"})
assert ListenSchema({"interface": "lo@5353"})
assert ListenSchema({"interface": "lo", "port": 5353})
assert ListenSchema({"interface": ["lo@5353", "eth0@5353"]})
assert ListenSchema({"interface": ["lo", "eth0"], "port": 5353})
+
+def test_listen_interface_invalid():
with raises(KresManagerException):
ListenSchema({"interface": "lo@5353", "port": 5353})
with raises(KresManagerException):
ListenSchema({"interface": ["lo@5353", "eth0@5353"], "port": 5353})
-def test_listen_validation():
+def test_listen_invalid():
+ with raises(KresManagerException):
+ ListenSchema({"ip-address": "::1", "port": 0})
with raises(KresManagerException):
- ListenSchema({"ip-address": "::1", "port": -10})
+ ListenSchema({"ip-address": "::1", "port": 65_536})
with raises(KresManagerException):
- ListenSchema({"ip-address": "::1", "interface": "eth0"})
+ ListenSchema({"ip-address": "::1", "interface": "lo"})
from pytest import raises
-from knot_resolver_manager.datamodel.server_schema import ServerSchema
+from knot_resolver_manager.datamodel.server_schema import ManagementSchema, ServerSchema
from knot_resolver_manager.exceptions import KresManagerException
with raises(KresManagerException):
ServerSchema({"backend": "supervisord", "watchdog": {"qname": "nic.cz.", "qtype": "A"}})
+
+
+def test_management():
+ assert ManagementSchema({"ip-address": "::1@53"})
+ assert ManagementSchema({"unix-socket": "/path/socket"})
+
+ with raises(KresManagerException):
+ ManagementSchema()
+ with raises(KresManagerException):
+ ManagementSchema({"ip-address": "::1@53", "unix-socket": "/path/socket"})