- supervisord
network:
listen:
- - interface: [127.0.0.1@5353]
+ - interface: 127.0.0.1@5353
- supervisord
network:
listen:
- - interface: [127.0.0.1@5353]
+ - interface: 127.0.0.1@5353
views:
- subnets: [127.0.0.0/24]
from typing_extensions import Literal
-from knot_resolver_manager.datamodel.types import DomainName, IPAddressOptionalPort
+from knot_resolver_manager.datamodel.types import DomainName, IPAddressOptionalPort, ListOrItem
from knot_resolver_manager.datamodel.types.files import FilePath
from knot_resolver_manager.utils.modeling import ConfigSchema
ca_file: Path to CA certificate file.
"""
- address: List[IPAddressOptionalPort]
+ address: ListOrItem[IPAddressOptionalPort]
transport: Optional[Literal["tls"]] = None
- pin_sha256: Optional[List[str]] = None
+ pin_sha256: Optional[str] = None
hostname: Optional[DomainName] = None
ca_file: Optional[FilePath] = None
IPNetwork,
IPv4Address,
IPv6Address,
+ ListOrItem,
PortNumber,
SizeUnit,
)
freebind: Used for binding to non-local address.
"""
- interface: Optional[List[InterfaceOptionalPort]] = None
- unix_socket: Optional[List[FilePath]] = None
+ interface: Optional[ListOrItem[InterfaceOptionalPort]] = None
+ unix_socket: Optional[ListOrItem[FilePath]] = None
port: Optional[PortNumber] = None
kind: KindEnum = "dns"
freebind: bool = False
_LAYER = Raw
- interface: Optional[List[InterfaceOptionalPort]]
- unix_socket: Optional[List[FilePath]]
+ interface: Optional[ListOrItem[InterfaceOptionalPort]]
+ unix_socket: Optional[ListOrItem[FilePath]]
port: Optional[PortNumber]
kind: KindEnum
freebind: bool
- def _interface(self, origin: Raw) -> Optional[List[InterfaceOptionalPort]]:
- if isinstance(origin.interface, list):
+ def _interface(self, origin: Raw) -> Optional[ListOrItem[InterfaceOptionalPort]]:
+ if origin.interface:
port_set: Optional[bool] = None
- for intrfc in origin.interface:
+ for intrfc in origin.interface: # type: ignore[attr-defined]
if origin.port and intrfc.port:
raise ValueError("The port number is defined in two places ('port' option and '@<port>' syntax).")
if port_set is not None and (bool(intrfc.port) != port_set):
"The '@<port>' syntax must be used either for all or none of the interface in the list."
)
port_set = bool(intrfc.port)
- elif isinstance(origin.interface, InterfaceOptionalPort) and origin.interface.port and origin.port:
- raise ValueError("The port number is defined in two places ('port' option and '@<port>' syntax).")
return origin.interface
def _port(self, origin: Raw) -> Optional[PortNumber]:
tls: TLSSchema = TLSSchema()
proxy_protocol: Union[Literal[False], ProxyProtocolSchema] = False
listen: List[ListenSchema] = [
- ListenSchema({"interface": ["127.0.0.1"]}),
- ListenSchema({"interface": ["::1"], "freebind": True}),
+ ListenSchema({"interface": "127.0.0.1"}),
+ ListenSchema({"interface": "::1", "freebind": True}),
]
{% macro network_listen(listen) -%}
{%- if listen.unix_socket -%}
- {%- if listen.unix_socket is iterable-%}
- {% for path in listen.unix_socket -%}
- {{ net_listen_unix_socket(path, listen.kind, listen.freebind) }}
- {% endfor -%}
- {%- else -%}
- {{ net_listen_unix_socket(listen.unix_socket, listen.kind, listen.freebind) }}
- {%- endif -%}
+{% for path in listen.unix_socket %}
+{{ net_listen_unix_socket(path, listen.kind, listen.freebind) }}
+{% endfor %}
{%- elif listen.interface -%}
- {%- if listen.interface is iterable-%}
- {% for interface in listen.interface -%}
- {{ net_listen_interface(interface, listen.kind, listen.freebind, listen.port) }}
- {% endfor -%}
- {%- else -%}
- {{ net_listen_interface(listen.interface, listen.kind, listen.freebind, listen.port) }}
- {%- endif -%}
+{% for interface in listen.interface %}
+{{ net_listen_interface(interface, listen.kind, listen.freebind, listen.port) }}
+{% endfor %}
{%- endif -%}
{%- endmacro %}
\ No newline at end of file
from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum
from .files import AbsoluteDir, Dir, File, FilePath
+from .generic_types import ListOrItem
from .types import (
DomainName,
IDPattern,
"IPv4Address",
"IPv6Address",
"IPv6Network96",
+ "ListOrItem",
"Percent",
"PortNumber",
"SizeUnit",
--- /dev/null
+from typing import Any, List, TypeVar, Union
+
+from knot_resolver_manager.utils.modeling import BaseGenericTypeWrapper
+
+T = TypeVar("T")
+
+
+class ListOrItem(BaseGenericTypeWrapper[Union[List[T], T]]):
+
+ _value_orig: Union[List[T], T]
+ _list: List[T]
+
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
+ super().__init__(source_value)
+ self._value_orig: Union[List[T], T] = source_value
+ self._list: List[T] = source_value if isinstance(source_value, list) else [source_value]
+
+ def __getitem__(self, index: Any) -> T:
+ return self._list[index]
+
+ def __int__(self) -> int:
+ raise ValueError(f"Can't convert '{type(self).__name__}' to an integer.")
+
+ def __str__(self) -> str:
+ return str(self._value_orig)
+
+ def to_std(self) -> List[T]:
+ return self._list
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, ListOrItem) and o._value_orig == self._value_orig
+
+ def serialize(self) -> Union[List[T], T]:
+ return self._value_orig
import ipaddress
import re
-from typing import Any, Dict, List, Optional, Type, TypeVar, Union
+from typing import Any, Dict, Optional, Type, Union
from knot_resolver_manager.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, UnitBase
from knot_resolver_manager.utils.modeling import BaseValueType
-_InnerType = TypeVar("_InnerType")
-ListOrSingle = List[_InnerType]
-
class IntNonNegative(IntRangeBase):
_min: int = 0
return self._value
def mbytes(self) -> int:
- return self._value // 1024 ** 2
+ return self._value // 1024**2
class TimeUnit(UnitBase):
- _units = {"us": 1, "ms": 10 ** 3, "s": 10 ** 6, "m": 60 * 10 ** 6, "h": 3600 * 10 ** 6, "d": 24 * 3600 * 10 ** 6}
+ _units = {"us": 1, "ms": 10**3, "s": 10**6, "m": 60 * 10**6, "h": 3600 * 10**6, "d": 24 * 3600 * 10**6}
def seconds(self) -> int:
- return self._value // 1000 ** 2
+ return self._value // 1000**2
def millis(self) -> int:
return self._value // 1000
tmpl = template_from_str(tmpl_str)
soc = ListenSchema({"unix-socket": "/tmp/kresd-socket", "kind": "dot"})
- assert tmpl.render(listen=soc) == "net.listen('/tmp/kresd-socket',nil,{kind='tls',freebind=false})"
- soc_list = ListenSchema({"unix-socket": [soc.unix_socket, "/tmp/kresd-socket2"], "kind": "dot"})
+ assert tmpl.render(listen=soc) == "net.listen('/tmp/kresd-socket',nil,{kind='tls',freebind=false})\n"
+ soc_list = ListenSchema({"unix-socket": [soc.unix_socket.to_std()[0], "/tmp/kresd-socket2"], "kind": "dot"})
assert (
tmpl.render(listen=soc_list)
== "net.listen('/tmp/kresd-socket',nil,{kind='tls',freebind=false})\n"
)
ip = ListenSchema({"interface": "::1@55", "freebind": True})
- assert tmpl.render(listen=ip) == "net.listen('::1',55,{kind='dns',freebind=true})"
- ip_list = ListenSchema({"interface": [ip.interface, "127.0.0.1@5353"]})
+ assert tmpl.render(listen=ip) == "net.listen('::1',55,{kind='dns',freebind=true})\n"
+ ip_list = ListenSchema({"interface": [ip.interface.to_std()[0], "127.0.0.1@5353"]})
assert (
tmpl.render(listen=ip_list)
== "net.listen('::1',55,{kind='dns',freebind=false})\n"
)
intrfc = ListenSchema({"interface": "eth0", "kind": "doh2"})
- assert tmpl.render(listen=intrfc) == "net.listen(net.eth0,443,{kind='doh2',freebind=false})"
- intrfc_list = ListenSchema({"interface": [intrfc.interface, "lo"], "port": 5555, "kind": "doh2"})
+ assert tmpl.render(listen=intrfc) == "net.listen(net.eth0,443,{kind='doh2',freebind=false})\n"
+ intrfc_list = ListenSchema({"interface": [intrfc.interface.to_std()[0], "lo"], "port": 5555, "kind": "doh2"})
assert (
tmpl.render(listen=intrfc_list)
== "net.listen(net.eth0,5555,{kind='doh2',freebind=false})\n"
try:
_ = json.dumps(obj)
except BaseException as e:
- raise Exception(f"failed to serialize '{path}'") from e
+ raise Exception(f"failed to serialize '{path}': {e}") from e
recser(dct)
assert len(o.listen) == 2
# {"ip-address": "127.0.0.1"}
- assert o.listen[0].interface == InterfaceOptionalPort("127.0.0.1")
+ assert o.listen[0].interface.to_std() == [InterfaceOptionalPort("127.0.0.1")]
assert o.listen[0].port == PortNumber(53)
assert o.listen[0].kind == "dns"
assert o.listen[0].freebind == False
# {"ip-address": "::1", "freebind": True}
- assert o.listen[1].interface == InterfaceOptionalPort("::1")
+ assert o.listen[1].interface.to_std() == [InterfaceOptionalPort("::1")]
assert o.listen[1].port == PortNumber(53)
assert o.listen[1].kind == "dns"
assert o.listen[1].freebind == True
@pytest.mark.parametrize(
"listen,port",
[
- ({"unix-socket": "/tmp/kresd-socket"}, None),
- ({"interface": "::1"}, 53),
- ({"interface": "::1", "kind": "dot"}, 853),
- ({"interface": "::1", "kind": "doh-legacy"}, 443),
- ({"interface": "::1", "kind": "doh2"}, 443),
+ ({"unix-socket": ["/tmp/kresd-socket"]}, None),
+ ({"interface": ["::1"]}, 53),
+ ({"interface": ["::1"], "kind": "dot"}, 853),
+ ({"interface": ["::1"], "kind": "doh-legacy"}, 443),
+ ({"interface": ["::1"], "kind": "doh2"}, 443),
],
)
def test_listen_port_defaults(listen: Dict[str, Any], port: Optional[int]):
@pytest.mark.parametrize(
"listen",
[
- {"unit-socket": "/tmp/kresd-socket", "port": "53"},
- {"interface": "::1", "unit-socket": "/tmp/kresd-socket"},
+ {"unix-socket": "/tmp/kresd-socket", "port": "53"},
+ {"interface": "::1", "unix-socket": "/tmp/kresd-socket"},
{"interface": "::1@5353", "port": 5353},
{"interface": ["127.0.0.1", "::1@5353"]},
{"interface": ["127.0.0.1@5353", "::1@5353"], "port": 5353},
--- /dev/null
+from typing import Any, List, Optional, Union
+
+import pytest
+from pytest import raises
+
+from knot_resolver_manager.datamodel.types import ListOrItem
+from knot_resolver_manager.utils.modeling import BaseSchema
+from knot_resolver_manager.utils.modeling.exceptions import DataValidationError
+from knot_resolver_manager.utils.modeling.types import get_generic_type_wrapper_argument
+
+
+@pytest.mark.parametrize("val", [str, int])
+def test_list_or_item_inner_type(val: Any):
+ assert get_generic_type_wrapper_argument(ListOrItem[val]) == Union[List[val], val]
+
+
+@pytest.mark.parametrize(
+ "typ,val",
+ [
+ (int, [1, 65_535, 5353, 5000]),
+ (int, 65_535),
+ (str, ["string1", "string2"]),
+ (str, "string1"),
+ ],
+)
+def test_list_or_item_valid(typ: Any, val: Any):
+ class ListOrItemSchema(BaseSchema):
+ test: ListOrItem[typ]
+
+ o = ListOrItemSchema({"test": val})
+ assert o.test.serialize() == val
+ assert o.test.to_std() == val if isinstance(val, list) else [val]
+
+ i = 0
+ for item in o.test:
+ assert item == val[i] if isinstance(val, list) else val
+ i += 1
+
+
+@pytest.mark.parametrize(
+ "typ,val",
+ [
+ (str, [True, False, True, False]),
+ (str, False),
+ (bool, [1, 65_535, 5353, 5000]),
+ (bool, 65_535),
+ (int, "string1"),
+ (int, ["string1", "string2"]),
+ ],
+)
+def test_list_or_item_invalid(typ: Any, val: Any):
+ class ListOrItemSchema(BaseSchema):
+ test: ListOrItem[typ]
+
+ with raises(DataValidationError):
+ ListOrItemSchema({"test": val})