information regarding copyright ownership.
"""
-from typing import Dict, List, Optional, Type
+from typing import Optional
import abc
class ResponseSpoofer(ResponseHandler, abc.ABC):
- spoofers: Dict[str, Type["ResponseSpoofer"]] = {}
+ spoofers: dict[str, type["ResponseSpoofer"]] = {}
def __init_subclass__(cls, mode: str) -> None:
assert mode not in cls.spoofers
self._current_handler: Optional[ResponseSpoofer] = None
def handle(
- self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1:
qctx.response.set_rcode(dns.rcode.SERVFAIL)
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import Dict
import time
@pytest.fixture(autouse=True)
-def autouse_flush_resolver_cache(servers: Dict[str, NamedInstance]) -> None:
+def autouse_flush_resolver_cache(servers: dict[str, NamedInstance]) -> None:
servers["ns4"].rndc("flush")
)
-def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> None:
+def test_bailiwick_sibling_ns_referral(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="sibling-ns", ans2="none")
ns4 = servers["ns4"]
check_domain_hijack(ns4)
-def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> None:
+def test_bailiwick_unsolicited_authority(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="unsolicited-ns")
ns4 = servers["ns4"]
check_domain_hijack(ns4)
-def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None:
+def test_bailiwick_parent_glue(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="parent-glue")
ns4 = servers["ns4"]
check_domain_hijack(ns4)
-def test_bailiwick_spoofed_dname(servers: Dict[str, NamedInstance]) -> None:
+def test_bailiwick_spoofed_dname(servers: dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="dname")
ns4 = servers["ns4"]
from dataclasses import dataclass
from enum import Enum
-from typing import AsyncGenerator, List, Optional, Tuple
+from typing import AsyncGenerator, Optional
import abc
import logging
def __init__(self, name_generator: ChainNameGenerator) -> None:
self._name_generator = name_generator
- def get_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ def get_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
"""
Return the lists of records and their signatures that should be
generated in response to a given "action".
raise NotImplementedError
@abc.abstractmethod
- def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
"""
Return the lists of records and their signatures that should be
generated in response to a given "action".
response_count = 1
- def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
target = self._name_generator.generate_next_name().to_text()
response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
response_count = 2
- def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
dname_owner = self._name_generator.current_domain
cname_owner = self._name_generator.current_name
dname_target = self._name_generator.generate_next_sld().to_text()
response_count = 1
- def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
target = self._name_generator.generate_next_name_in_next_sld().to_text()
response = self.create_rrset(owner, dns.rdatatype.CNAME, target)
response_count = 1
- def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
owner = self._name_generator.current_name
response = self.create_rrset(owner, dns.rdatatype.A, "10.53.0.4")
signature = self.create_rrset_signature(owner, response.rdtype)
self._current_handler: Optional[ChainResponseHandler] = None
def handle(
- self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
try:
actions, selectors = self._parse_args(args)
return "chain response setup successful"
def _parse_args(
- self, args: List[str]
- ) -> Tuple[List[ChainAction], List[ChainSelector]]:
+ self, args: list[str]
+ ) -> tuple[list[ChainAction], list[ChainSelector]]:
try:
delimiter = args.index("_")
except ValueError as exc:
return actions, selectors
- def _parse_args_actions(self, args_actions: List[str]) -> List[ChainAction]:
+ def _parse_args_actions(self, args_actions: list[str]) -> list[ChainAction]:
actions = []
for action in args_actions + ["FINAL"]:
return actions
def _parse_args_selectors(
- self, args_selectors: List[str], actions: List[ChainAction]
- ) -> List[ChainSelector]:
+ self, args_selectors: list[str], actions: list[ChainAction]
+ ) -> list[ChainSelector]:
max_response_index = self._get_max_response_index(actions)
selectors = []
return selectors
- def _get_max_response_index(self, actions: List[ChainAction]) -> int:
+ def _get_max_response_index(self, actions: list[ChainAction]) -> int:
rrset_generator_classes = [a.value for a in actions]
return sum(g.response_count for g in rrset_generator_classes)
def _prepare_answer(
- self, actions: List[ChainAction], selectors: List[ChainSelector]
- ) -> List[dns.rrset.RRset]:
+ self, actions: list[ChainAction], selectors: list[ChainSelector]
+ ) -> list[dns.rrset.RRset]:
all_responses, all_signatures = self._generate_rrsets(actions)
return self._select_rrsets(all_responses, all_signatures, selectors)
def _generate_rrsets(
- self, actions: List[ChainAction]
- ) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]:
+ self, actions: list[ChainAction]
+ ) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]:
all_responses = []
all_signatures = []
name_generator = ChainNameGenerator()
def _select_rrsets(
self,
- all_responses: List[dns.rrset.RRset],
- all_signatures: List[dns.rrset.RRset],
- selectors: List[ChainSelector],
- ) -> List[dns.rrset.RRset]:
+ all_responses: list[dns.rrset.RRset],
+ all_signatures: list[dns.rrset.RRset],
+ selectors: list[ChainSelector],
+ ) -> list[dns.rrset.RRset]:
rrsets = []
for selector in selectors:
domains = ["domain.nil."]
- def __init__(self, answer_rrsets: List[dns.rrset.RRset]):
+ def __init__(self, answer_rrsets: list[dns.rrset.RRset]):
super().__init__()
self._answer_rrsets = answer_rrsets
qctx.response.use_edns()
yield DnsResponseSend(qctx.response)
- def _non_chain_answer(self, qctx: QueryContext) -> List[dns.rrset.RRset]:
+ def _non_chain_answer(self, qctx: QueryContext) -> list[dns.rrset.RRset]:
owner = qctx.qname
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),
]
@property
- def _authority_rrsets(self) -> List[dns.rrset.RRset]:
+ def _authority_rrsets(self) -> list[dns.rrset.RRset]:
owner = dns.name.from_text("domain.nil.")
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.NS, "ns1.domain.nil."),
]
@property
- def _additional_rrsets(self) -> List[dns.rrset.RRset]:
+ def _additional_rrsets(self) -> list[dns.rrset.RRset]:
owner = dns.name.from_text("ns1.domain.nil.")
return [
RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"),
# information regarding copyright ownership.
-from typing import NamedTuple, Tuple
+from typing import NamedTuple
import os
import sys
class CheckDSTest(NamedTuple):
zone: str
- logs_to_wait_for: Tuple[str]
+ logs_to_wait_for: tuple[str]
expected_parent_state: str
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import AsyncGenerator, List, Optional
+from typing import AsyncGenerator, Optional
import logging
class ErraticAxfrHandler(ResponseHandler):
allowed_actions = ["no-response", "partial-axfr", "complete-axfr"]
- def __init__(self, actions: List[str]) -> None:
+ def __init__(self, actions: list[str]) -> None:
self.actions = actions
self.counter = 0
for action in actions:
self._current_handler: Optional[ResponseHandler] = None
def handle(
- self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> str:
for action in args:
if action not in ErraticAxfrHandler.allowed_actions:
AsyncGenerator,
Callable,
Coroutine,
- Dict,
- List,
Optional,
Sequence,
- Set,
- Tuple,
Union,
cast,
)
import dns.zone
_UdpHandler = Callable[
- [bytes, Tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
+ [bytes, tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None]
]
"""
self._transport = cast(asyncio.DatagramTransport, transport)
- def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
+ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
"""
Called by asyncio when a datagram is received.
"""
logging.info("Setting up IPv4 listener at %s:%d", ipv4_address, port)
logging.info("Setting up IPv6 listener at [%s]:%d", ipv6_address, port)
- self._ip_addresses: Tuple[str, str] = (ipv4_address, ipv6_address)
+ self._ip_addresses: tuple[str, str] = (ipv4_address, ipv6_address)
self._port: int = port
self._udp_handler: Optional[_UdpHandler] = udp_handler
self._tcp_handler: Optional[_TcpHandler] = tcp_handler
loop.set_exception_handler(self._handle_exception)
def _handle_exception(
- self, _: asyncio.AbstractEventLoop, context: Dict[str, Any]
+ self, _: asyncio.AbstractEventLoop, context: dict[str, Any]
) -> None:
assert self._work_done
exception = context.get("exception", RuntimeError(context["message"]))
client socket, effectively ignoring all incoming connections.
"""
- _connections: Set[asyncio.StreamWriter] = field(default_factory=set)
+ _connections: set[asyncio.StreamWriter] = field(default_factory=set)
async def handle(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer: Peer
@property
@abc.abstractmethod
- def qnames(self) -> List[str]:
+ def qnames(self) -> list[str]:
"""
A list of QNAMEs handled by this class.
"""
raise NotImplementedError
def __init__(self) -> None:
- self._qnames: List[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames]
+ self._qnames: list[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames]
def __str__(self) -> str:
return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)})"
@property
@abc.abstractmethod
- def qtypes(self) -> List[dns.rdatatype.RdataType]:
+ def qtypes(self) -> list[dns.rdatatype.RdataType]:
"""
A list of QTYPEs handled by this class.
"""
def __init__(self) -> None:
super().__init__()
- self._qtypes: List[dns.rdatatype.RdataType] = self.qtypes
+ self._qtypes: list[dns.rdatatype.RdataType] = self.qtypes
def __str__(self) -> str:
return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)}; QTYPEs: {', '.join(map(str, self.qtypes))})"
@property
@abc.abstractmethod
- def domains(self) -> List[str]:
+ def domains(self) -> list[str]:
"""
A list of domain names handled by this class.
"""
raise NotImplementedError
def __init__(self) -> None:
- self._domains: List[dns.name.Name] = sorted(
+ self._domains: list[dns.name.Name] = sorted(
[dns.name.from_text(d) for d in self.domains], reverse=True
)
self._matched_domain: Optional[dns.name.Name] = None
logging.debug("[OUT] %s", self._query.hex())
cast(asyncio.DatagramTransport, transport).sendto(self._query)
- def datagram_received(self, data: bytes, _: Tuple[str, int]) -> None:
+ def datagram_received(self, data: bytes, _: tuple[str, int]) -> None:
logging.debug("[IN] %s", data.hex())
self._response.set_result(data)
"""
zone: Optional[dns.zone.Zone]
- children: List["_ZoneTreeNode"] = field(default_factory=list)
+ children: list["_ZoneTreeNode"] = field(default_factory=list)
class _ZoneTree:
from failing on messages initialized with `dns.message.from_wire(keyring=False)`.
"""
- def sign(*_: Any, **__: Any) -> Tuple[dns.rdata.Rdata, None]:
+ def sign(*_: Any, **__: Any) -> tuple[dns.rdata.Rdata, None]:
assert self.tsig
return self.tsig[0], None
default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
default_aa: bool = False,
keyring: Union[
- Dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
+ dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
] = _NoKeyringType(),
acknowledge_manual_dname_handling: bool = False,
) -> None:
self._zone_tree: _ZoneTree = _ZoneTree()
self._connection_handler: Optional[ConnectionHandler] = None
- self._response_handlers: List[ResponseHandler] = []
+ self._response_handlers: list[ResponseHandler] = []
self._default_rcode = default_rcode
self._default_aa = default_aa
self._keyring = keyring
raise ValueError(error)
async def _handle_udp(
- self, wire: bytes, addr: Tuple[str, int], transport: asyncio.DatagramTransport
+ self, wire: bytes, addr: tuple[str, int], transport: asyncio.DatagramTransport
) -> None:
logging.debug("Received UDP message: %s", wire.hex())
socket_info = transport.get_extra_info("sockname")
return dns.name.from_text(self._CONTROL_DOMAIN)
@functools.cached_property
- def _commands(self) -> Dict[dns.name.Name, "ControlCommand"]:
+ def _commands(self) -> dict[dns.name.Name, "ControlCommand"]:
return {}
def install_control_commands(self, *commands: "ControlCommand") -> None:
@abc.abstractmethod
def handle(
- self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
"""
This method is expected to carry out arbitrary actions in response to a
self._current_handler: Optional[IgnoreAllQueries] = None
def handle(
- self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1:
logging.error("Invalid %s query %s", self, qctx.qname)
control_subdomain = "switch"
- def __init__(self, handler_mapping: Dict[str, Sequence[ResponseHandler]]):
+ def __init__(self, handler_mapping: dict[str, Sequence[ResponseHandler]]):
self._handler_mapping = handler_mapping
def handle(
- self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
+ self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1 or args[0] not in self._handler_mapping:
logging.error("Invalid %s query %s", self, qctx.qname)
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import List, Optional, cast
+from typing import Optional, cast
import difflib
import os
def _extract_ede_options(
message: dns.message.Message,
-) -> List[EDEOption]:
+) -> list[EDEOption]:
"""Extract EDE options from the DNS message."""
return cast(
- List[EDEOption],
+ list[EDEOption],
[
option
for option in message.options
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import List, Union
+from typing import Union
from warnings import warn
import collections.abc
@composite
def _partition_bytes_to_labels(
draw, remaining_bytes: int, number_of_labels: int
-) -> List[int]:
+) -> list[int]:
two_bytes_reserved_for_label = 2
# Reserve two bytes for each label
# information regarding copyright ownership.
from pathlib import Path
-from typing import List, NamedTuple, Optional
+from typing import NamedTuple, Optional
import os
import re
watcher.wait_for_line("all zones loaded")
return cmd
- def stop(self, args: Optional[List[str]] = None) -> None:
+ def stop(self, args: Optional[list[str]] = None) -> None:
"""Stop the instance."""
args = args or []
perl(
[self.system_test_name, self.identifier] + args,
)
- def start(self, args: Optional[List[str]] = None) -> None:
+ def start(self, args: Optional[list[str]] = None) -> None:
"""Start the instance."""
args = args or []
perl(
from functools import total_ordering
from pathlib import Path
from re import compile as Re
-from typing import Dict, List, Optional, Tuple, Union
+from typing import Optional, Union
import glob
import os
self,
name: str,
metadata: dict,
- timing: Dict[str, KeyTimingMetadata],
+ timing: dict[str, KeyTimingMetadata],
private: bool = True,
legacy: bool = False,
role: str = "csk",
"KSK": "yes",
"ZSK": "yes",
}
- timing: Dict[str, KeyTimingMetadata] = {}
+ timing: dict[str, KeyTimingMetadata] = {}
result = KeyProperties(name="DEFAULT", metadata=metadata, timing=timing)
result.name = "DEFAULT"
def get_signing_state(
self, offline_ksk=False, zsk_missing=False, smooth=False
- ) -> Tuple[bool, bool]:
+ ) -> tuple[bool, bool]:
"""
This returns the signing state derived from the key states, KRRSIGState
and ZRRSIGState.
def keydir_to_keylist(
zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False
-) -> List[Key]:
+) -> list[Key]:
"""
Retrieve all keys from the key files in a directory. If 'zone' is None,
retrieve all keys in the directory, otherwise only those matching the
return [k for k in all_keys if used(k)]
-def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
+def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> list[Key]:
return [Key(name, keydir) for name in keystr.split()]
-def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
+def policy_to_properties(ttl, keys: list[str]) -> list[KeyProperties]:
"""
Get the policies from a list of specially formatted strings.
The splitted line should result in the following items:
line = key.split()
# defaults
- metadata: Dict[str, Union[str, int]] = {}
- timing: Dict[str, KeyTimingMetadata] = {}
+ metadata: dict[str, Union[str, int]] = {}
+ timing: dict[str, KeyTimingMetadata] = {}
private = True
legacy = False
keytag_min = 0
LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s"
LOG_INDENT = 4
-LOGGERS = {
+LOGGERS: dict[str, logging.Logger | None] = {
"conftest": None,
"module": None,
"test": None,
-} # type: Dict[str, Optional[logging.Logger]]
+}
def init_conftest_logger():
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union
+from typing import Any, Match, Optional, Pattern, TextIO, TypeVar, Union
import abc
import os
from isctest.text import FlexPattern, LineReader, compile_pattern
T = TypeVar("T")
-OneOrMore = Union[T, List[T]]
+OneOrMore = Union[T, list[T]]
class WatchLogException(Exception):
self._timeout = timeout
self._deadline = 0.0
- def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> List[Pattern]:
+ def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> list[Pattern]:
self._wait_function_called = True
self._deadline = time.monotonic() + self._timeout
return self._prepare_patterns(patterns)
- def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> List[Pattern]:
+ def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> list[Pattern]:
"""
Convert a mix of string(s) and/or pattern(s) into a list of patterns.
patterns.append(compile_pattern(string))
return patterns
- def _wait_for_match(self, regexes: List[Pattern]) -> Match:
+ def _wait_for_match(self, regexes: list[Pattern]) -> Match:
if not self._reader:
raise WatchLogException(
"use WatchLog as context manager before calling wait_for_*() functions"
return self._wait_for_match(regexes)
- def wait_for_sequence(self, patterns: List[FlexPattern]) -> List[Match]:
+ def wait_for_sequence(self, patterns: list[FlexPattern]) -> list[Match]:
"""
Block execution until the specified pattern sequence is found in the
log file.
return matches
- def wait_for_all(self, patterns: List[FlexPattern]) -> List[Match]:
+ def wait_for_all(self, patterns: list[FlexPattern]) -> list[Match]:
"""
Block execution until all the specified patterns are found in the
log file in any order.
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import FrozenSet, Iterable
+from typing import Iterable
from dns.name import Name
return len(name) + sum(map(len, name.labels))
-def get_wildcard_names(names: Iterable[Name]) -> FrozenSet[Name]:
+def get_wildcard_names(names: Iterable[Name]) -> frozenset[Name]:
return frozenset(name for name in names if name.is_wild())
.union(self.reachable_dnames)
)
- def get_names_with_type(self, rdtype) -> FrozenSet[Name]:
+ def get_names_with_type(self, rdtype) -> frozenset[Name]:
return frozenset(
name for name in self.zone if self.zone.get_rdataset(name, rdtype)
)
self.reachable_delegations = frozenset(reachable_delegations)
self.occluded = frozenset(occluded)
- def generate_ents(self) -> FrozenSet[Name]:
+ def generate_ents(self) -> frozenset[Name]:
"""
Generate reachable names of empty nodes "between" all reachable
names with a RR and the origin.
# information regarding copyright ownership.
from pathlib import Path
-from typing import List, Optional
+from typing import Optional
import os
import subprocess
def _run_script(
interpreter: str,
script: str,
- args: Optional[List[str]] = None,
+ args: Optional[list[str]] = None,
):
if args is None:
args = []
isctest.log.debug(" exited with %d", returncode)
-def shell(script: str, args: Optional[List[str]] = None) -> None:
+def shell(script: str, args: Optional[list[str]] = None) -> None:
"""Run a given script with system's shell interpreter."""
_run_script(os.environ["SHELL"], script, args)
-def perl(script: str, args: Optional[List[str]] = None) -> None:
+def perl(script: str, args: Optional[list[str]] = None) -> None:
"""Run a given script with system's perl interpreter."""
_run_script(os.environ["PERL"], script, args)
from dataclasses import dataclass
from pathlib import Path
-from typing import Any, Dict, Optional, Union
+from typing import Any, Optional, Union
import jinja2
def render(
self,
output: str,
- data: Optional[Dict[str, Any]] = None,
+ data: Optional[dict[str, Any]] = None,
template: Optional[str] = None,
) -> None:
"""
stream = self.j2env.get_template(template).stream(data)
stream.dump(output, encoding="utf-8")
- def render_auto(self, data: Optional[Dict[str, Any]] = None):
+ def render_auto(self, data: Optional[dict[str, Any]] = None):
"""
Render all *.j2 templates with default (and optionally the provided)
values and write the output to files without the .j2 extensions.
# information regarding copyright ownership.
from re import compile as Re
-from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
+from typing import Iterator, Match, Optional, Pattern, TextIO, Union
import abc
import re
if match:
yield match
- def grep(self, pattern: FlexPattern) -> List[Match]:
+ def grep(self, pattern: FlexPattern) -> list[Match]:
"""
Get list of lines matching the pattern.
"""
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import Dict, List, NamedTuple, Optional, Union
+from typing import NamedTuple, Optional, Union
import os
import platform
class AlgorithmSet(NamedTuple):
"""Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms"""
- default: Union[Algorithm, List[Algorithm]]
+ default: Union[Algorithm, list[Algorithm]]
"""DEFAULT is the algorithm for testing."""
- alternative: Union[Algorithm, List[Algorithm]]
+ alternative: Union[Algorithm, list[Algorithm]]
"""ALTERNATIVE is an alternative algorithm for test cases that require more
than one algorithm (for example algorithm rollover)."""
- disabled: Union[Algorithm, List[Algorithm]]
+ disabled: Union[Algorithm, list[Algorithm]]
"""DISABLED is an algorithm that is used for tests against the
"disable-algorithms" configuration option."""
"RSASHA512OID_SUPPORTED": "0",
}
-SUPPORTED_ALGORITHMS: List[Algorithm] = []
+SUPPORTED_ALGORITHMS: list[Algorithm] = []
def init_crypto_supported():
return AlgorithmSet(default, alternative, disabled)
-def _algorithms_env(algs: AlgorithmSet, name: str) -> Dict[str, str]:
+def _algorithms_env(algs: AlgorithmSet, name: str) -> dict[str, str]:
"""Return environment variables with selected algorithms as a dict."""
algs_env = {
"ALGORITHM_SET": name,
# information regarding copyright ownership.
from pathlib import Path
-from typing import Dict
SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system"
-def load_vars_from_build_files() -> Dict[str, str]:
+def load_vars_from_build_files() -> dict[str, str]:
# TOP_BUILDDIR is special, it is always read from the source directory
top_builddir_file = Path(__file__).resolve().parent / ".build_vars" / "TOP_BUILDDIR"
if not top_builddir_file.exists():
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import Dict
from isctest.instance import NamedInstance
from isctest.mark import live_internet_test
@live_internet_test
-def test_mirror_root_zone(servers: Dict[str, NamedInstance]):
+def test_mirror_root_zone(servers: dict[str, NamedInstance]):
"""
This test pulls the root zone from the Internet, so let's only run
it when CI_ENABLE_LIVE_INTERNET_TESTS is set.
from dataclasses import dataclass
from pathlib import Path
-from typing import Container, Iterable, Optional, Set, Tuple
+from typing import Container, Iterable, Optional
import os
def do_test_query(
qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int
-) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]:
+) -> tuple[dns.message.QueryMessage, "NSEC3Checker"]:
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response, query)
assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}"
self.params: NSEC3Params = NSEC3Params(**attrs_seen)
self.response: dns.message.Message = response
- self.owners_present: Set[dns.name.Name] = owners_seen
- self.owners_used: Set[dns.name.Name] = set()
+ self.owners_present: set[dns.name.Name] = owners_seen
+ self.owners_used: set[dns.name.Name] = set()
@staticmethod
def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool:
information regarding copyright ownership.
"""
-from typing import AsyncGenerator, Tuple, Union
+from typing import AsyncGenerator, Union
import dns.edns
import dns.name
def _cname_rrsets(
qname: Union[dns.name.Name, str],
-) -> Tuple[dns.rrset.RRset, dns.rrset.RRset]:
+) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
return (
rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
rrset(qname, dns.rdatatype.A, "1.2.3.4"),
information regarding copyright ownership.
"""
-from typing import AsyncGenerator, List, NamedTuple, Union
+from typing import AsyncGenerator, NamedTuple, Union
import abc
def rrset_from_list(
qname: Union[dns.name.Name, str],
rtype: dns.rdatatype.RdataType,
- rdata_list: List[str],
+ rdata_list: list[str],
ttl: int = 300,
) -> dns.rrset.RRset:
return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import List
-
import shutil
from isctest.kasp import SettimeOptions, private_type_record
import isctest
-def configure_tld(zonename: str, delegations: List[Zone]) -> Zone:
+def configure_tld(zonename: str, delegations: list[Zone]) -> Zone:
templates = isctest.template.TemplateEngine(".")
alg = Algorithm.default()
keygen = EnvCmd("KEYGEN", f"-q -a {alg.number} -b {alg.bits} -L 3600")
return Zone(zonename, f"{outfile}.signed", Nameserver("ns2", "10.53.0.2"))
-def configure_root(delegations: List[Zone]) -> TrustAnchor:
+def configure_root(delegations: list[Zone]) -> TrustAnchor:
templates = isctest.template.TemplateEngine(".")
alg = Algorithm.default()
keygen = EnvCmd("KEYGEN", f"-q -a {alg.number} -b {alg.bits} -L 3600")
def render_and_sign_zone(
- zonename: str, keys: List[str], signing: bool = True, extra_options: str = ""
+ zonename: str, keys: list[str], signing: bool = True, extra_options: str = ""
):
dnskeys = []
privaterrs = []
)
-def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> List[Zone]:
+def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> list[Zone]:
# The zones at csk-algorithm-roll.$tld represent the various steps
# of a CSK algorithm rollover.
zones = []
return zones
-def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> List[Zone]:
+def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> list[Zone]:
# The zones at algorithm-roll.$tld represent the various steps of a ZSK/KSK
# algorithm rollover.
zones = []
return zones
-def configure_cskroll1(tld: str, policy: str) -> List[Zone]:
+def configure_cskroll1(tld: str, policy: str) -> list[Zone]:
# The zones at csk-roll1.$tld represent the various steps of a CSK rollover
# (which is essentially a ZSK Pre-Publication / KSK Double-KSK rollover).
zones = []
return zones
-def configure_cskroll2(tld: str, policy: str) -> List[Zone]:
+def configure_cskroll2(tld: str, policy: str) -> list[Zone]:
# The zones at csk-roll2.$tld represent the various steps of a CSK rollover
# (which is essentially a ZSK Pre-Publication / KSK Double-KSK rollover).
# This scenario differs from the csk-roll1 one because the zone signatures (ZRRSIG)
return zones
-def configure_enable_dnssec(tld: str, policy: str) -> List[Zone]:
+def configure_enable_dnssec(tld: str, policy: str) -> list[Zone]:
# The zones at enable-dnssec.$tld represent the various steps of the
# initial signing of a zone.
zones = []
return zones
-def configure_going_insecure(tld: str, reconfig: bool = False) -> List[Zone]:
+def configure_going_insecure(tld: str, reconfig: bool = False) -> list[Zone]:
zones = []
keygen = EnvCmd("KEYGEN", "-a ECDSA256 -L 7200")
return zones
-def configure_straight2none(tld: str) -> List[Zone]:
+def configure_straight2none(tld: str) -> list[Zone]:
# These zones are going straight to "none" policy. This is undefined behavior.
zones = []
keygen = EnvCmd("KEYGEN", "-k default")
return zones
-def configure_ksk_doubleksk(tld: str) -> List[Zone]:
+def configure_ksk_doubleksk(tld: str) -> list[Zone]:
# The zones at ksk-doubleksk.$tld represent the various steps of a KSK
# Double-KSK rollover.
zones = []
return zones
-def configure_ksk_3crowd(tld: str) -> List[Zone]:
+def configure_ksk_3crowd(tld: str) -> list[Zone]:
# Test #2375, the "three is a crowd" bug, where a new key is introduced but the
# previous rollover has not finished yet. In other words, we have a key KEY2
# that is the successor of key KEY1, and we introduce a new key KEY3 that is
return zones
-def configure_zsk_prepub(tld: str) -> List[Zone]:
+def configure_zsk_prepub(tld: str) -> list[Zone]:
# The zones at zsk-prepub.$tld represent the various steps of a ZSK
# Pre-Publication rollover.
zones = []
"F401",
# sorted __all__
"RUF022",
+ # unnecessary `typing` imports
+ "UP006",
# f-strings
"UP031",
"UP032",