From: Štěpán Balážik Date: Mon, 9 Feb 2026 14:33:22 +0000 (+0100) Subject: Built-in types are now subscriptable X-Git-Tag: v9.21.19~15^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=790745da18fb575ab48b2d5d48bf45e2054528f9;p=thirdparty%2Fbind9.git Built-in types are now subscriptable Generated with: ruff check --extend-select UP006 --fix --- diff --git a/bin/tests/system/bailiwick/bailiwick_ans.py b/bin/tests/system/bailiwick/bailiwick_ans.py index 28353a0a099..4c2f4f4e75a 100644 --- a/bin/tests/system/bailiwick/bailiwick_ans.py +++ b/bin/tests/system/bailiwick/bailiwick_ans.py @@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional information regarding copyright ownership. """ -from typing import Dict, List, Optional, Type +from typing import Optional import abc @@ -30,7 +30,7 @@ from isctest.asyncserver import ( class ResponseSpoofer(ResponseHandler, abc.ABC): - spoofers: Dict[str, Type["ResponseSpoofer"]] = {} + spoofers: dict[str, type["ResponseSpoofer"]] = {} def __init_subclass__(cls, mode: str) -> None: assert mode not in cls.spoofers @@ -69,7 +69,7 @@ class SetSpoofingModeCommand(ControlCommand): self._current_handler: Optional[ResponseSpoofer] = None def handle( - self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext + self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext ) -> Optional[str]: if len(args) != 1: qctx.response.set_rcode(dns.rcode.SERVFAIL) diff --git a/bin/tests/system/bailiwick/tests_bailiwick.py b/bin/tests/system/bailiwick/tests_bailiwick.py index 105fc6de5a1..3fc647992df 100644 --- a/bin/tests/system/bailiwick/tests_bailiwick.py +++ b/bin/tests/system/bailiwick/tests_bailiwick.py @@ -9,7 +9,6 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import Dict import time @@ -22,7 +21,7 @@ import isctest @pytest.fixture(autouse=True) -def autouse_flush_resolver_cache(servers: Dict[str, NamedInstance]) -> None: +def autouse_flush_resolver_cache(servers: dict[str, NamedInstance]) -> None: servers["ns4"].rndc("flush") @@ -78,7 +77,7 @@ def check_domain_hijack(ns4: NamedInstance) -> None: ) -def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> None: +def test_bailiwick_sibling_ns_referral(servers: dict[str, NamedInstance]) -> None: set_spoofing_mode(ans1="sibling-ns", ans2="none") ns4 = servers["ns4"] @@ -86,7 +85,7 @@ def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> Non check_domain_hijack(ns4) -def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> None: +def test_bailiwick_unsolicited_authority(servers: dict[str, NamedInstance]) -> None: set_spoofing_mode(ans1="none", ans2="unsolicited-ns") ns4 = servers["ns4"] @@ -95,7 +94,7 @@ def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> N check_domain_hijack(ns4) -def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None: +def test_bailiwick_parent_glue(servers: dict[str, NamedInstance]) -> None: set_spoofing_mode(ans1="none", ans2="parent-glue") ns4 = servers["ns4"] @@ -108,7 +107,7 @@ def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None: check_domain_hijack(ns4) -def test_bailiwick_spoofed_dname(servers: Dict[str, NamedInstance]) -> None: +def test_bailiwick_spoofed_dname(servers: dict[str, NamedInstance]) -> None: set_spoofing_mode(ans1="none", ans2="dname") ns4 = servers["ns4"] diff --git a/bin/tests/system/chain/ans4/ans.py b/bin/tests/system/chain/ans4/ans.py index 2f0d8c3352e..ebe1b6b746e 100755 --- a/bin/tests/system/chain/ans4/ans.py +++ b/bin/tests/system/chain/ans4/ans.py @@ -13,7 +13,7 @@ information regarding copyright ownership. from dataclasses import dataclass from enum import Enum -from typing import AsyncGenerator, List, Optional, Tuple +from typing import AsyncGenerator, Optional import abc import logging @@ -123,7 +123,7 @@ class RecordGenerator(abc.ABC): def __init__(self, name_generator: ChainNameGenerator) -> None: self._name_generator = name_generator - def get_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + def get_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: """ Return the lists of records and their signatures that should be generated in response to a given "action". @@ -155,7 +155,7 @@ class RecordGenerator(abc.ABC): raise NotImplementedError @abc.abstractmethod - def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: """ Return the lists of records and their signatures that should be generated in response to a given "action". @@ -170,7 +170,7 @@ class CnameRecordGenerator(RecordGenerator): response_count = 1 - def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: owner = self._name_generator.current_name target = self._name_generator.generate_next_name().to_text() response = self.create_rrset(owner, dns.rdatatype.CNAME, target) @@ -182,7 +182,7 @@ class DnameRecordGenerator(RecordGenerator): response_count = 2 - def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: dname_owner = self._name_generator.current_domain cname_owner = self._name_generator.current_name dname_target = self._name_generator.generate_next_sld().to_text() @@ -206,7 +206,7 @@ class XnameRecordGenerator(RecordGenerator): response_count = 1 - def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: owner = self._name_generator.current_name target = self._name_generator.generate_next_name_in_next_sld().to_text() response = self.create_rrset(owner, dns.rdatatype.CNAME, target) @@ -218,7 +218,7 @@ class FinalRecordGenerator(RecordGenerator): response_count = 1 - def generate_rrsets(self) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + def generate_rrsets(self) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: owner = self._name_generator.current_name response = self.create_rrset(owner, dns.rdatatype.A, "10.53.0.4") signature = self.create_rrset_signature(owner, response.rdtype) @@ -300,7 +300,7 @@ class ChainSetupCommand(ControlCommand): self._current_handler: Optional[ChainResponseHandler] = None def handle( - self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext + self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext ) -> Optional[str]: try: actions, selectors = self._parse_args(args) @@ -320,8 +320,8 @@ class ChainSetupCommand(ControlCommand): return "chain response setup successful" def _parse_args( - self, args: List[str] - ) -> Tuple[List[ChainAction], List[ChainSelector]]: + self, args: list[str] + ) -> tuple[list[ChainAction], list[ChainSelector]]: try: delimiter = args.index("_") except ValueError as exc: @@ -335,7 +335,7 @@ class ChainSetupCommand(ControlCommand): return actions, selectors - def _parse_args_actions(self, args_actions: List[str]) -> List[ChainAction]: + def _parse_args_actions(self, args_actions: list[str]) -> list[ChainAction]: actions = [] for action in args_actions + ["FINAL"]: @@ -347,8 +347,8 @@ class ChainSetupCommand(ControlCommand): return actions def _parse_args_selectors( - self, args_selectors: List[str], actions: List[ChainAction] - ) -> List[ChainSelector]: + self, args_selectors: list[str], actions: list[ChainAction] + ) -> list[ChainSelector]: max_response_index = self._get_max_response_index(actions) selectors = [] @@ -366,19 +366,19 @@ class ChainSetupCommand(ControlCommand): return selectors - def _get_max_response_index(self, actions: List[ChainAction]) -> int: + def _get_max_response_index(self, actions: list[ChainAction]) -> int: rrset_generator_classes = [a.value for a in actions] return sum(g.response_count for g in rrset_generator_classes) def _prepare_answer( - self, actions: List[ChainAction], selectors: List[ChainSelector] - ) -> List[dns.rrset.RRset]: + self, actions: list[ChainAction], selectors: list[ChainSelector] + ) -> list[dns.rrset.RRset]: all_responses, all_signatures = self._generate_rrsets(actions) return self._select_rrsets(all_responses, all_signatures, selectors) def _generate_rrsets( - self, actions: List[ChainAction] - ) -> Tuple[List[dns.rrset.RRset], List[dns.rrset.RRset]]: + self, actions: list[ChainAction] + ) -> tuple[list[dns.rrset.RRset], list[dns.rrset.RRset]]: all_responses = [] all_signatures = [] name_generator = ChainNameGenerator() @@ -394,10 +394,10 @@ class ChainSetupCommand(ControlCommand): def _select_rrsets( self, - all_responses: List[dns.rrset.RRset], - all_signatures: List[dns.rrset.RRset], - selectors: List[ChainSelector], - ) -> List[dns.rrset.RRset]: + all_responses: list[dns.rrset.RRset], + all_signatures: list[dns.rrset.RRset], + selectors: list[ChainSelector], + ) -> list[dns.rrset.RRset]: rrsets = [] for selector in selectors: @@ -418,7 +418,7 @@ class ChainResponseHandler(DomainHandler): domains = ["domain.nil."] - def __init__(self, answer_rrsets: List[dns.rrset.RRset]): + def __init__(self, answer_rrsets: list[dns.rrset.RRset]): super().__init__() self._answer_rrsets = answer_rrsets @@ -441,7 +441,7 @@ class ChainResponseHandler(DomainHandler): qctx.response.use_edns() yield DnsResponseSend(qctx.response) - def _non_chain_answer(self, qctx: QueryContext) -> List[dns.rrset.RRset]: + def _non_chain_answer(self, qctx: QueryContext) -> list[dns.rrset.RRset]: owner = qctx.qname return [ RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"), @@ -449,14 +449,14 @@ class ChainResponseHandler(DomainHandler): ] @property - def _authority_rrsets(self) -> List[dns.rrset.RRset]: + def _authority_rrsets(self) -> list[dns.rrset.RRset]: owner = dns.name.from_text("domain.nil.") return [ RecordGenerator.create_rrset(owner, dns.rdatatype.NS, "ns1.domain.nil."), ] @property - def _additional_rrsets(self) -> List[dns.rrset.RRset]: + def _additional_rrsets(self) -> list[dns.rrset.RRset]: owner = dns.name.from_text("ns1.domain.nil.") return [ RecordGenerator.create_rrset(owner, dns.rdatatype.A, "10.53.0.4"), diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index 037517ca235..245ed5bf623 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -12,7 +12,7 @@ # information regarding copyright ownership. -from typing import NamedTuple, Tuple +from typing import NamedTuple import os import sys @@ -189,7 +189,7 @@ def keystate_check(server, zone, key): class CheckDSTest(NamedTuple): zone: str - logs_to_wait_for: Tuple[str] + logs_to_wait_for: tuple[str] expected_parent_state: str diff --git a/bin/tests/system/digdelv/ans5/ans.py b/bin/tests/system/digdelv/ans5/ans.py index 6039559449f..a897a8bfd2f 100644 --- a/bin/tests/system/digdelv/ans5/ans.py +++ b/bin/tests/system/digdelv/ans5/ans.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import AsyncGenerator, List, Optional +from typing import AsyncGenerator, Optional import logging @@ -31,7 +31,7 @@ from isctest.asyncserver import ( class ErraticAxfrHandler(ResponseHandler): allowed_actions = ["no-response", "partial-axfr", "complete-axfr"] - def __init__(self, actions: List[str]) -> None: + def __init__(self, actions: list[str]) -> None: self.actions = actions self.counter = 0 for action in actions: @@ -72,7 +72,7 @@ class ResponseSequenceCommand(ControlCommand): self._current_handler: Optional[ResponseHandler] = None def handle( - self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext + self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext ) -> str: for action in args: if action not in ErraticAxfrHandler.allowed_actions: diff --git a/bin/tests/system/isctest/asyncserver.py b/bin/tests/system/isctest/asyncserver.py index 72979399a61..8cb9efc6058 100644 --- a/bin/tests/system/isctest/asyncserver.py +++ b/bin/tests/system/isctest/asyncserver.py @@ -17,12 +17,8 @@ from typing import ( AsyncGenerator, Callable, Coroutine, - Dict, - List, Optional, Sequence, - Set, - Tuple, Union, cast, ) @@ -57,7 +53,7 @@ import dns.version import dns.zone _UdpHandler = Callable[ - [bytes, Tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None] + [bytes, tuple[str, int], asyncio.DatagramTransport], Coroutine[Any, Any, None] ] @@ -84,7 +80,7 @@ class _AsyncUdpHandler(asyncio.DatagramProtocol): """ self._transport = cast(asyncio.DatagramTransport, transport) - def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: """ Called by asyncio when a datagram is received. """ @@ -133,7 +129,7 @@ class AsyncServer: logging.info("Setting up IPv4 listener at %s:%d", ipv4_address, port) logging.info("Setting up IPv6 listener at [%s]:%d", ipv6_address, port) - self._ip_addresses: Tuple[str, str] = (ipv4_address, ipv6_address) + self._ip_addresses: tuple[str, str] = (ipv4_address, ipv6_address) self._port: int = port self._udp_handler: Optional[_UdpHandler] = udp_handler self._tcp_handler: Optional[_TcpHandler] = tcp_handler @@ -186,7 +182,7 @@ class AsyncServer: loop.set_exception_handler(self._handle_exception) def _handle_exception( - self, _: asyncio.AbstractEventLoop, context: Dict[str, Any] + self, _: asyncio.AbstractEventLoop, context: dict[str, Any] ) -> None: assert self._work_done exception = context.get("exception", RuntimeError(context["message"])) @@ -512,7 +508,7 @@ class IgnoreAllConnections(ConnectionHandler): client socket, effectively ignoring all incoming connections. """ - _connections: Set[asyncio.StreamWriter] = field(default_factory=set) + _connections: set[asyncio.StreamWriter] = field(default_factory=set) async def handle( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, peer: Peer @@ -623,14 +619,14 @@ class QnameHandler(ResponseHandler): @property @abc.abstractmethod - def qnames(self) -> List[str]: + def qnames(self) -> list[str]: """ A list of QNAMEs handled by this class. """ raise NotImplementedError def __init__(self) -> None: - self._qnames: List[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames] + self._qnames: list[dns.name.Name] = [dns.name.from_text(d) for d in self.qnames] def __str__(self) -> str: return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)})" @@ -653,7 +649,7 @@ class QnameQtypeHandler(QnameHandler): @property @abc.abstractmethod - def qtypes(self) -> List[dns.rdatatype.RdataType]: + def qtypes(self) -> list[dns.rdatatype.RdataType]: """ A list of QTYPEs handled by this class. """ @@ -661,7 +657,7 @@ class QnameQtypeHandler(QnameHandler): def __init__(self) -> None: super().__init__() - self._qtypes: List[dns.rdatatype.RdataType] = self.qtypes + self._qtypes: list[dns.rdatatype.RdataType] = self.qtypes def __str__(self) -> str: return f"{self.__class__.__name__}(QNAMEs: {', '.join(self.qnames)}; QTYPEs: {', '.join(map(str, self.qtypes))})" @@ -755,14 +751,14 @@ class DomainHandler(ResponseHandler): @property @abc.abstractmethod - def domains(self) -> List[str]: + def domains(self) -> list[str]: """ A list of domain names handled by this class. """ raise NotImplementedError def __init__(self) -> None: - self._domains: List[dns.name.Name] = sorted( + self._domains: list[dns.name.Name] = sorted( [dns.name.from_text(d) for d in self.domains], reverse=True ) self._matched_domain: Optional[dns.name.Name] = None @@ -833,7 +829,7 @@ class ForwarderHandler(ResponseHandler): logging.debug("[OUT] %s", self._query.hex()) cast(asyncio.DatagramTransport, transport).sendto(self._query) - def datagram_received(self, data: bytes, _: Tuple[str, int]) -> None: + def datagram_received(self, data: bytes, _: tuple[str, int]) -> None: logging.debug("[IN] %s", data.hex()) self._response.set_result(data) @@ -897,7 +893,7 @@ class _ZoneTreeNode: """ zone: Optional[dns.zone.Zone] - children: List["_ZoneTreeNode"] = field(default_factory=list) + children: list["_ZoneTreeNode"] = field(default_factory=list) class _ZoneTree: @@ -977,7 +973,7 @@ class _DnsMessageWithTsigDisabled(dns.message.Message): from failing on messages initialized with `dns.message.from_wire(keyring=False)`. """ - def sign(*_: Any, **__: Any) -> Tuple[dns.rdata.Rdata, None]: + def sign(*_: Any, **__: Any) -> tuple[dns.rdata.Rdata, None]: assert self.tsig return self.tsig[0], None @@ -1055,7 +1051,7 @@ class AsyncDnsServer(AsyncServer): default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED, default_aa: bool = False, keyring: Union[ - Dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType + dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType ] = _NoKeyringType(), acknowledge_manual_dname_handling: bool = False, ) -> None: @@ -1063,7 +1059,7 @@ class AsyncDnsServer(AsyncServer): self._zone_tree: _ZoneTree = _ZoneTree() self._connection_handler: Optional[ConnectionHandler] = None - self._response_handlers: List[ResponseHandler] = [] + self._response_handlers: list[ResponseHandler] = [] self._default_rcode = default_rcode self._default_aa = default_aa self._keyring = keyring @@ -1172,7 +1168,7 @@ class AsyncDnsServer(AsyncServer): raise ValueError(error) async def _handle_udp( - self, wire: bytes, addr: Tuple[str, int], transport: asyncio.DatagramTransport + self, wire: bytes, addr: tuple[str, int], transport: asyncio.DatagramTransport ) -> None: logging.debug("Received UDP message: %s", wire.hex()) socket_info = transport.get_extra_info("sockname") @@ -1592,7 +1588,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer): return dns.name.from_text(self._CONTROL_DOMAIN) @functools.cached_property - def _commands(self) -> Dict[dns.name.Name, "ControlCommand"]: + def _commands(self) -> dict[dns.name.Name, "ControlCommand"]: return {} def install_control_commands(self, *commands: "ControlCommand") -> None: @@ -1703,7 +1699,7 @@ class ControlCommand(abc.ABC): @abc.abstractmethod def handle( - self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext + self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext ) -> Optional[str]: """ This method is expected to carry out arbitrary actions in response to a @@ -1745,7 +1741,7 @@ class ToggleResponsesCommand(ControlCommand): self._current_handler: Optional[IgnoreAllQueries] = None def handle( - self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext + self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext ) -> Optional[str]: if len(args) != 1: logging.error("Invalid %s query %s", self, qctx.qname) @@ -1785,11 +1781,11 @@ class SwitchControlCommand(ControlCommand): control_subdomain = "switch" - def __init__(self, handler_mapping: Dict[str, Sequence[ResponseHandler]]): + def __init__(self, handler_mapping: dict[str, Sequence[ResponseHandler]]): self._handler_mapping = handler_mapping def handle( - self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext + self, args: list[str], server: ControllableAsyncDnsServer, qctx: QueryContext ) -> Optional[str]: if len(args) != 1 or args[0] not in self._handler_mapping: logging.error("Invalid %s query %s", self, qctx.qname) diff --git a/bin/tests/system/isctest/check.py b/bin/tests/system/isctest/check.py index c0a550b42d4..6e6a894c2c8 100644 --- a/bin/tests/system/isctest/check.py +++ b/bin/tests/system/isctest/check.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import List, Optional, cast +from typing import Optional, cast import difflib import os @@ -72,10 +72,10 @@ def noraflag(message: dns.message.Message) -> None: def _extract_ede_options( message: dns.message.Message, -) -> List[EDEOption]: +) -> list[EDEOption]: """Extract EDE options from the DNS message.""" return cast( - List[EDEOption], + list[EDEOption], [ option for option in message.options diff --git a/bin/tests/system/isctest/hypothesis/strategies.py b/bin/tests/system/isctest/hypothesis/strategies.py index 1a008518890..74a86c6a644 100644 --- a/bin/tests/system/isctest/hypothesis/strategies.py +++ b/bin/tests/system/isctest/hypothesis/strategies.py @@ -11,7 +11,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import List, Union +from typing import Union from warnings import warn import collections.abc @@ -156,7 +156,7 @@ dns_rdatatypes_without_meta = integers(0, dns.rdatatype.OPT - 1) | integers(dns. @composite def _partition_bytes_to_labels( draw, remaining_bytes: int, number_of_labels: int -) -> List[int]: +) -> list[int]: two_bytes_reserved_for_label = 2 # Reserve two bytes for each label diff --git a/bin/tests/system/isctest/instance.py b/bin/tests/system/isctest/instance.py index 58e027fe8ef..a50daa86444 100644 --- a/bin/tests/system/isctest/instance.py +++ b/bin/tests/system/isctest/instance.py @@ -12,7 +12,7 @@ # information regarding copyright ownership. from pathlib import Path -from typing import List, NamedTuple, Optional +from typing import NamedTuple, Optional import os import re @@ -175,7 +175,7 @@ class NamedInstance: watcher.wait_for_line("all zones loaded") return cmd - def stop(self, args: Optional[List[str]] = None) -> None: + def stop(self, args: Optional[list[str]] = None) -> None: """Stop the instance.""" args = args or [] perl( @@ -183,7 +183,7 @@ class NamedInstance: [self.system_test_name, self.identifier] + args, ) - def start(self, args: Optional[List[str]] = None) -> None: + def start(self, args: Optional[list[str]] = None) -> None: """Start the instance.""" args = args or [] perl( diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index e7227f7c6a9..5e8b1877c7d 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -14,7 +14,7 @@ from datetime import datetime, timedelta, timezone from functools import total_ordering from pathlib import Path from re import compile as Re -from typing import Dict, List, Optional, Tuple, Union +from typing import Optional, Union import glob import os @@ -180,7 +180,7 @@ class KeyProperties: self, name: str, metadata: dict, - timing: Dict[str, KeyTimingMetadata], + timing: dict[str, KeyTimingMetadata], private: bool = True, legacy: bool = False, role: str = "csk", @@ -219,7 +219,7 @@ class KeyProperties: "KSK": "yes", "ZSK": "yes", } - timing: Dict[str, KeyTimingMetadata] = {} + timing: dict[str, KeyTimingMetadata] = {} result = KeyProperties(name="DEFAULT", metadata=metadata, timing=timing) result.name = "DEFAULT" @@ -439,7 +439,7 @@ class Key: def get_signing_state( self, offline_ksk=False, zsk_missing=False, smooth=False - ) -> Tuple[bool, bool]: + ) -> tuple[bool, bool]: """ This returns the signing state derived from the key states, KRRSIGState and ZRRSIGState. @@ -1596,7 +1596,7 @@ def next_key_event_equals(server, zone, next_event): def keydir_to_keylist( zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False -) -> List[Key]: +) -> list[Key]: """ Retrieve all keys from the key files in a directory. If 'zone' is None, retrieve all keys in the directory, otherwise only those matching the @@ -1637,11 +1637,11 @@ def keydir_to_keylist( return [k for k in all_keys if used(k)] -def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: +def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> list[Key]: return [Key(name, keydir) for name in keystr.split()] -def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]: +def policy_to_properties(ttl, keys: list[str]) -> list[KeyProperties]: """ Get the policies from a list of specially formatted strings. The splitted line should result in the following items: @@ -1663,8 +1663,8 @@ def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]: line = key.split() # defaults - metadata: Dict[str, Union[str, int]] = {} - timing: Dict[str, KeyTimingMetadata] = {} + metadata: dict[str, Union[str, int]] = {} + timing: dict[str, KeyTimingMetadata] = {} private = True legacy = False keytag_min = 0 diff --git a/bin/tests/system/isctest/log/basic.py b/bin/tests/system/isctest/log/basic.py index 762741f5710..981049b6262 100644 --- a/bin/tests/system/isctest/log/basic.py +++ b/bin/tests/system/isctest/log/basic.py @@ -17,11 +17,11 @@ import textwrap LOG_FORMAT = "%(asctime)s %(levelname)7s:%(name)s %(message)s" LOG_INDENT = 4 -LOGGERS = { +LOGGERS: dict[str, logging.Logger | None] = { "conftest": None, "module": None, "test": None, -} # type: Dict[str, Optional[logging.Logger]] +} def init_conftest_logger(): diff --git a/bin/tests/system/isctest/log/watchlog.py b/bin/tests/system/isctest/log/watchlog.py index 5c2bab0f202..24f298f118a 100644 --- a/bin/tests/system/isctest/log/watchlog.py +++ b/bin/tests/system/isctest/log/watchlog.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union +from typing import Any, Match, Optional, Pattern, TextIO, TypeVar, Union import abc import os @@ -18,7 +18,7 @@ import time from isctest.text import FlexPattern, LineReader, compile_pattern T = TypeVar("T") -OneOrMore = Union[T, List[T]] +OneOrMore = Union[T, list[T]] class WatchLogException(Exception): @@ -71,12 +71,12 @@ class WatchLog(abc.ABC): self._timeout = timeout self._deadline = 0.0 - def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> List[Pattern]: + def _setup_wait(self, patterns: OneOrMore[FlexPattern]) -> list[Pattern]: self._wait_function_called = True self._deadline = time.monotonic() + self._timeout return self._prepare_patterns(patterns) - def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> List[Pattern]: + def _prepare_patterns(self, strings: OneOrMore[FlexPattern]) -> list[Pattern]: """ Convert a mix of string(s) and/or pattern(s) into a list of patterns. @@ -90,7 +90,7 @@ class WatchLog(abc.ABC): patterns.append(compile_pattern(string)) return patterns - def _wait_for_match(self, regexes: List[Pattern]) -> Match: + def _wait_for_match(self, regexes: list[Pattern]) -> Match: if not self._reader: raise WatchLogException( "use WatchLog as context manager before calling wait_for_*() functions" @@ -209,7 +209,7 @@ class WatchLog(abc.ABC): return self._wait_for_match(regexes) - def wait_for_sequence(self, patterns: List[FlexPattern]) -> List[Match]: + def wait_for_sequence(self, patterns: list[FlexPattern]) -> list[Match]: """ Block execution until the specified pattern sequence is found in the log file. @@ -285,7 +285,7 @@ class WatchLog(abc.ABC): return matches - def wait_for_all(self, patterns: List[FlexPattern]) -> List[Match]: + def wait_for_all(self, patterns: list[FlexPattern]) -> list[Match]: """ Block execution until all the specified patterns are found in the log file in any order. diff --git a/bin/tests/system/isctest/name.py b/bin/tests/system/isctest/name.py index cc80f44d795..e0b23805ac0 100644 --- a/bin/tests/system/isctest/name.py +++ b/bin/tests/system/isctest/name.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import FrozenSet, Iterable +from typing import Iterable from dns.name import Name @@ -26,7 +26,7 @@ def len_wire_uncompressed(name: Name) -> int: return len(name) + sum(map(len, name.labels)) -def get_wildcard_names(names: Iterable[Name]) -> FrozenSet[Name]: +def get_wildcard_names(names: Iterable[Name]) -> frozenset[Name]: return frozenset(name for name in names if name.is_wild()) @@ -84,7 +84,7 @@ class ZoneAnalyzer: .union(self.reachable_dnames) ) - def get_names_with_type(self, rdtype) -> FrozenSet[Name]: + def get_names_with_type(self, rdtype) -> frozenset[Name]: return frozenset( name for name in self.zone if self.zone.get_rdataset(name, rdtype) ) @@ -148,7 +148,7 @@ class ZoneAnalyzer: self.reachable_delegations = frozenset(reachable_delegations) self.occluded = frozenset(occluded) - def generate_ents(self) -> FrozenSet[Name]: + def generate_ents(self) -> frozenset[Name]: """ Generate reachable names of empty nodes "between" all reachable names with a RR and the origin. diff --git a/bin/tests/system/isctest/run.py b/bin/tests/system/isctest/run.py index 7c7559726d2..8108c60f77f 100644 --- a/bin/tests/system/isctest/run.py +++ b/bin/tests/system/isctest/run.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. from pathlib import Path -from typing import List, Optional +from typing import Optional import os import subprocess @@ -98,7 +98,7 @@ class EnvCmd: def _run_script( interpreter: str, script: str, - args: Optional[List[str]] = None, + args: Optional[list[str]] = None, ): if args is None: args = [] @@ -130,12 +130,12 @@ def _run_script( isctest.log.debug(" exited with %d", returncode) -def shell(script: str, args: Optional[List[str]] = None) -> None: +def shell(script: str, args: Optional[list[str]] = None) -> None: """Run a given script with system's shell interpreter.""" _run_script(os.environ["SHELL"], script, args) -def perl(script: str, args: Optional[List[str]] = None) -> None: +def perl(script: str, args: Optional[list[str]] = None) -> None: """Run a given script with system's perl interpreter.""" _run_script(os.environ["PERL"], script, args) diff --git a/bin/tests/system/isctest/template.py b/bin/tests/system/isctest/template.py index f454dca7d03..cf6e7e93d87 100644 --- a/bin/tests/system/isctest/template.py +++ b/bin/tests/system/isctest/template.py @@ -13,7 +13,7 @@ from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict, Optional, Union +from typing import Any, Optional, Union import jinja2 @@ -44,7 +44,7 @@ class TemplateEngine: def render( self, output: str, - data: Optional[Dict[str, Any]] = None, + data: Optional[dict[str, Any]] = None, template: Optional[str] = None, ) -> None: """ @@ -69,7 +69,7 @@ class TemplateEngine: stream = self.j2env.get_template(template).stream(data) stream.dump(output, encoding="utf-8") - def render_auto(self, data: Optional[Dict[str, Any]] = None): + def render_auto(self, data: Optional[dict[str, Any]] = None): """ Render all *.j2 templates with default (and optionally the provided) values and write the output to files without the .j2 extensions. diff --git a/bin/tests/system/isctest/text.py b/bin/tests/system/isctest/text.py index 549f06ff7f3..9048e4630f8 100644 --- a/bin/tests/system/isctest/text.py +++ b/bin/tests/system/isctest/text.py @@ -12,7 +12,7 @@ # information regarding copyright ownership. from re import compile as Re -from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union +from typing import Iterator, Match, Optional, Pattern, TextIO, Union import abc import re @@ -48,7 +48,7 @@ class Grep(abc.ABC): if match: yield match - def grep(self, pattern: FlexPattern) -> List[Match]: + def grep(self, pattern: FlexPattern) -> list[Match]: """ Get list of lines matching the pattern. """ diff --git a/bin/tests/system/isctest/vars/algorithms.py b/bin/tests/system/isctest/vars/algorithms.py index c0a2eed5c1b..1630e8215ba 100644 --- a/bin/tests/system/isctest/vars/algorithms.py +++ b/bin/tests/system/isctest/vars/algorithms.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import Dict, List, NamedTuple, Optional, Union +from typing import NamedTuple, Optional, Union import os import platform @@ -74,14 +74,14 @@ class Algorithm(NamedTuple): class AlgorithmSet(NamedTuple): """Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms""" - default: Union[Algorithm, List[Algorithm]] + default: Union[Algorithm, list[Algorithm]] """DEFAULT is the algorithm for testing.""" - alternative: Union[Algorithm, List[Algorithm]] + alternative: Union[Algorithm, list[Algorithm]] """ALTERNATIVE is an alternative algorithm for test cases that require more than one algorithm (for example algorithm rollover).""" - disabled: Union[Algorithm, List[Algorithm]] + disabled: Union[Algorithm, list[Algorithm]] """DISABLED is an algorithm that is used for tests against the "disable-algorithms" configuration option.""" @@ -171,7 +171,7 @@ CRYPTO_SUPPORTED_VARS = { "RSASHA512OID_SUPPORTED": "0", } -SUPPORTED_ALGORITHMS: List[Algorithm] = [] +SUPPORTED_ALGORITHMS: list[Algorithm] = [] def init_crypto_supported(): @@ -261,7 +261,7 @@ def _select_random(algs: AlgorithmSet, stable_period=STABLE_PERIOD) -> Algorithm return AlgorithmSet(default, alternative, disabled) -def _algorithms_env(algs: AlgorithmSet, name: str) -> Dict[str, str]: +def _algorithms_env(algs: AlgorithmSet, name: str) -> dict[str, str]: """Return environment variables with selected algorithms as a dict.""" algs_env = { "ALGORITHM_SET": name, diff --git a/bin/tests/system/isctest/vars/build.py b/bin/tests/system/isctest/vars/build.py index aa251e0e6b3..18ceca9a00b 100644 --- a/bin/tests/system/isctest/vars/build.py +++ b/bin/tests/system/isctest/vars/build.py @@ -10,12 +10,11 @@ # information regarding copyright ownership. from pathlib import Path -from typing import Dict SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system" -def load_vars_from_build_files() -> Dict[str, str]: +def load_vars_from_build_files() -> dict[str, str]: # TOP_BUILDDIR is special, it is always read from the source directory top_builddir_file = Path(__file__).resolve().parent / ".build_vars" / "TOP_BUILDDIR" if not top_builddir_file.exists(): diff --git a/bin/tests/system/mirror-root-zone/tests_mirror_root_zone.py b/bin/tests/system/mirror-root-zone/tests_mirror_root_zone.py index 31fdf8dd50a..fd638b88bc8 100644 --- a/bin/tests/system/mirror-root-zone/tests_mirror_root_zone.py +++ b/bin/tests/system/mirror-root-zone/tests_mirror_root_zone.py @@ -9,14 +9,13 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import Dict from isctest.instance import NamedInstance from isctest.mark import live_internet_test @live_internet_test -def test_mirror_root_zone(servers: Dict[str, NamedInstance]): +def test_mirror_root_zone(servers: dict[str, NamedInstance]): """ This test pulls the root zone from the Internet, so let's only run it when CI_ENABLE_LIVE_INTERNET_TESTS is set. diff --git a/bin/tests/system/nsec3-answer/tests_nsec3.py b/bin/tests/system/nsec3-answer/tests_nsec3.py index c926d053f19..7dd7782be75 100755 --- a/bin/tests/system/nsec3-answer/tests_nsec3.py +++ b/bin/tests/system/nsec3-answer/tests_nsec3.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from pathlib import Path -from typing import Container, Iterable, Optional, Set, Tuple +from typing import Container, Iterable, Optional import os @@ -63,7 +63,7 @@ def is_related_to_any( def do_test_query( qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int -) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]: +) -> tuple[dns.message.QueryMessage, "NSEC3Checker"]: query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT) isctest.check.is_response_to(response, query) @@ -349,8 +349,8 @@ class NSEC3Checker: assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}" self.params: NSEC3Params = NSEC3Params(**attrs_seen) self.response: dns.message.Message = response - self.owners_present: Set[dns.name.Name] = owners_seen - self.owners_used: Set[dns.name.Name] = set() + self.owners_present: set[dns.name.Name] = owners_seen + self.owners_used: set[dns.name.Name] = set() @staticmethod def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool: diff --git a/bin/tests/system/resolver/ans2/ans.py b/bin/tests/system/resolver/ans2/ans.py index 130d778df96..165f6c14415 100644 --- a/bin/tests/system/resolver/ans2/ans.py +++ b/bin/tests/system/resolver/ans2/ans.py @@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional information regarding copyright ownership. """ -from typing import AsyncGenerator, Tuple, Union +from typing import AsyncGenerator, Union import dns.edns import dns.name @@ -55,7 +55,7 @@ class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler): def _cname_rrsets( qname: Union[dns.name.Name, str], -) -> Tuple[dns.rrset.RRset, dns.rrset.RRset]: +) -> tuple[dns.rrset.RRset, dns.rrset.RRset]: return ( rrset(qname, dns.rdatatype.CNAME, f"{qname}"), rrset(qname, dns.rdatatype.A, "1.2.3.4"), diff --git a/bin/tests/system/resolver/resolver_ans.py b/bin/tests/system/resolver/resolver_ans.py index e5d7854fd93..d06b1493866 100644 --- a/bin/tests/system/resolver/resolver_ans.py +++ b/bin/tests/system/resolver/resolver_ans.py @@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional information regarding copyright ownership. """ -from typing import AsyncGenerator, List, NamedTuple, Union +from typing import AsyncGenerator, NamedTuple, Union import abc @@ -40,7 +40,7 @@ def rrset( def rrset_from_list( qname: Union[dns.name.Name, str], rtype: dns.rdatatype.RdataType, - rdata_list: List[str], + rdata_list: list[str], ttl: int = 300, ) -> dns.rrset.RRset: return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list) diff --git a/bin/tests/system/rollover/setup.py b/bin/tests/system/rollover/setup.py index f0953e97da0..15bbee9a706 100644 --- a/bin/tests/system/rollover/setup.py +++ b/bin/tests/system/rollover/setup.py @@ -9,8 +9,6 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import List - import shutil from isctest.kasp import SettimeOptions, private_type_record @@ -21,7 +19,7 @@ from isctest.vars.algorithms import Algorithm import isctest -def configure_tld(zonename: str, delegations: List[Zone]) -> Zone: +def configure_tld(zonename: str, delegations: list[Zone]) -> Zone: templates = isctest.template.TemplateEngine(".") alg = Algorithm.default() keygen = EnvCmd("KEYGEN", f"-q -a {alg.number} -b {alg.bits} -L 3600") @@ -55,7 +53,7 @@ def configure_tld(zonename: str, delegations: List[Zone]) -> Zone: return Zone(zonename, f"{outfile}.signed", Nameserver("ns2", "10.53.0.2")) -def configure_root(delegations: List[Zone]) -> TrustAnchor: +def configure_root(delegations: list[Zone]) -> TrustAnchor: templates = isctest.template.TemplateEngine(".") alg = Algorithm.default() keygen = EnvCmd("KEYGEN", f"-q -a {alg.number} -b {alg.bits} -L 3600") @@ -115,7 +113,7 @@ def setkeytimes(key_name: str, options: SettimeOptions): def render_and_sign_zone( - zonename: str, keys: List[str], signing: bool = True, extra_options: str = "" + zonename: str, keys: list[str], signing: bool = True, extra_options: str = "" ): dnskeys = [] privaterrs = [] @@ -142,7 +140,7 @@ def render_and_sign_zone( ) -def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> List[Zone]: +def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> list[Zone]: # The zones at csk-algorithm-roll.$tld represent the various steps # of a CSK algorithm rollover. zones = [] @@ -341,7 +339,7 @@ def configure_algo_csk(tld: str, policy: str, reconfig: bool = False) -> List[Zo return zones -def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> List[Zone]: +def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> list[Zone]: # The zones at algorithm-roll.$tld represent the various steps of a ZSK/KSK # algorithm rollover. zones = [] @@ -659,7 +657,7 @@ def configure_algo_ksk_zsk(tld: str, reconfig: bool = False) -> List[Zone]: return zones -def configure_cskroll1(tld: str, policy: str) -> List[Zone]: +def configure_cskroll1(tld: str, policy: str) -> list[Zone]: # The zones at csk-roll1.$tld represent the various steps of a CSK rollover # (which is essentially a ZSK Pre-Publication / KSK Double-KSK rollover). zones = [] @@ -1053,7 +1051,7 @@ def configure_cskroll1(tld: str, policy: str) -> List[Zone]: return zones -def configure_cskroll2(tld: str, policy: str) -> List[Zone]: +def configure_cskroll2(tld: str, policy: str) -> list[Zone]: # The zones at csk-roll2.$tld represent the various steps of a CSK rollover # (which is essentially a ZSK Pre-Publication / KSK Double-KSK rollover). # This scenario differs from the csk-roll1 one because the zone signatures (ZRRSIG) @@ -1464,7 +1462,7 @@ def configure_cskroll2(tld: str, policy: str) -> List[Zone]: return zones -def configure_enable_dnssec(tld: str, policy: str) -> List[Zone]: +def configure_enable_dnssec(tld: str, policy: str) -> list[Zone]: # The zones at enable-dnssec.$tld represent the various steps of the # initial signing of a zone. zones = [] @@ -1559,7 +1557,7 @@ def configure_enable_dnssec(tld: str, policy: str) -> List[Zone]: return zones -def configure_going_insecure(tld: str, reconfig: bool = False) -> List[Zone]: +def configure_going_insecure(tld: str, reconfig: bool = False) -> list[Zone]: zones = [] keygen = EnvCmd("KEYGEN", "-a ECDSA256 -L 7200") @@ -1639,7 +1637,7 @@ def configure_going_insecure(tld: str, reconfig: bool = False) -> List[Zone]: return zones -def configure_straight2none(tld: str) -> List[Zone]: +def configure_straight2none(tld: str) -> list[Zone]: # These zones are going straight to "none" policy. This is undefined behavior. zones = [] keygen = EnvCmd("KEYGEN", "-k default") @@ -1687,7 +1685,7 @@ def configure_straight2none(tld: str) -> List[Zone]: return zones -def configure_ksk_doubleksk(tld: str) -> List[Zone]: +def configure_ksk_doubleksk(tld: str) -> list[Zone]: # The zones at ksk-doubleksk.$tld represent the various steps of a KSK # Double-KSK rollover. zones = [] @@ -2003,7 +2001,7 @@ def configure_ksk_doubleksk(tld: str) -> List[Zone]: return zones -def configure_ksk_3crowd(tld: str) -> List[Zone]: +def configure_ksk_3crowd(tld: str) -> list[Zone]: # Test #2375, the "three is a crowd" bug, where a new key is introduced but the # previous rollover has not finished yet. In other words, we have a key KEY2 # that is the successor of key KEY1, and we introduce a new key KEY3 that is @@ -2072,7 +2070,7 @@ def configure_ksk_3crowd(tld: str) -> List[Zone]: return zones -def configure_zsk_prepub(tld: str) -> List[Zone]: +def configure_zsk_prepub(tld: str) -> list[Zone]: # The zones at zsk-prepub.$tld represent the various steps of a ZSK # Pre-Publication rollover. zones = [] diff --git a/pyproject.toml b/pyproject.toml index 51beb6bef8a..f4b90f2cf6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,8 @@ lint.select = [ "F401", # sorted __all__ "RUF022", + # unnecessary `typing` imports + "UP006", # f-strings "UP031", "UP032",