From: Bob Halley Date: Sat, 30 Aug 2025 19:30:41 +0000 (-0700) Subject: Type syntax (#1218) X-Git-Tag: v2.8.0rc1~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=641633503ee6b07bf382594ac04707072b90de18;p=thirdparty%2Fdnspython.git Type syntax (#1218) * use | type syntax when possible instead of Optional and Union * fix other unused-import errors and re-enable F401 --- diff --git a/dns/_tls_util.py b/dns/_tls_util.py index 79c421d0..10ddf727 100644 --- a/dns/_tls_util.py +++ b/dns/_tls_util.py @@ -1,14 +1,14 @@ # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license import os -from typing import Optional, Tuple, Union +from typing import Tuple def convert_verify_to_cafile_and_capath( - verify: Union[bool, str], -) -> Tuple[Optional[str], Optional[str]]: - cafile: Optional[str] = None - capath: Optional[str] = None + verify: bool | str, +) -> Tuple[str | None, str | None]: + cafile: str | None = None + capath: str | None = None if isinstance(verify, str): if os.path.isfile(verify): cafile = verify diff --git a/dns/asyncquery.py b/dns/asyncquery.py index c7b43083..bb770458 100644 --- a/dns/asyncquery.py +++ b/dns/asyncquery.py @@ -24,7 +24,7 @@ import socket import struct import time import urllib.parse -from typing import Any, Dict, Optional, Tuple, Union, cast +from typing import Any, Dict, Optional, Tuple, cast import dns.asyncbackend import dns.exception @@ -90,9 +90,9 @@ def _timeout(expiration, now=None): async def send_udp( sock: dns.asyncbackend.DatagramSocket, - what: Union[dns.message.Message, bytes], + what: dns.message.Message | bytes, destination: Any, - expiration: Optional[float] = None, + expiration: float | None = None, ) -> Tuple[int, float]: """Send a DNS message to the specified UDP socket. @@ -120,16 +120,16 @@ async def send_udp( async def receive_udp( sock: dns.asyncbackend.DatagramSocket, - destination: Optional[Any] = None, - expiration: Optional[float] = None, + destination: Any | None = None, + expiration: float | None = None, ignore_unexpected: bool = False, one_rr_per_rrset: bool = False, - keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None, - request_mac: Optional[bytes] = b"", + keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None, + request_mac: bytes | None = b"", ignore_trailing: bool = False, raise_on_truncation: bool = False, ignore_errors: bool = False, - query: Optional[dns.message.Message] = None, + query: dns.message.Message | None = None, ) -> Any: """Read a DNS message from a UDP socket. @@ -182,16 +182,16 @@ async def receive_udp( async def udp( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 53, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, ignore_unexpected: bool = False, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, raise_on_truncation: bool = False, - sock: Optional[dns.asyncbackend.DatagramSocket] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + sock: dns.asyncbackend.DatagramSocket | None = None, + backend: dns.asyncbackend.Backend | None = None, ignore_errors: bool = False, ) -> dns.message.Message: """Return the response obtained after sending a query via UDP. @@ -248,16 +248,16 @@ async def udp( async def udp_with_fallback( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 53, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, ignore_unexpected: bool = False, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - udp_sock: Optional[dns.asyncbackend.DatagramSocket] = None, - tcp_sock: Optional[dns.asyncbackend.StreamSocket] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + udp_sock: dns.asyncbackend.DatagramSocket | None = None, + tcp_sock: dns.asyncbackend.StreamSocket | None = None, + backend: dns.asyncbackend.Backend | None = None, ignore_errors: bool = False, ) -> Tuple[dns.message.Message, bool]: """Return the response to the query, trying UDP first and falling back @@ -315,8 +315,8 @@ async def udp_with_fallback( async def send_tcp( sock: dns.asyncbackend.StreamSocket, - what: Union[dns.message.Message, bytes], - expiration: Optional[float] = None, + what: dns.message.Message | bytes, + expiration: float | None = None, ) -> Tuple[int, float]: """Send a DNS message to the specified TCP socket. @@ -354,10 +354,10 @@ async def _read_exactly(sock, count, expiration): async def receive_tcp( sock: dns.asyncbackend.StreamSocket, - expiration: Optional[float] = None, + expiration: float | None = None, one_rr_per_rrset: bool = False, - keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None, - request_mac: Optional[bytes] = b"", + keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None, + request_mac: bytes | None = b"", ignore_trailing: bool = False, ) -> Tuple[dns.message.Message, float]: """Read a DNS message from a TCP socket. @@ -385,14 +385,14 @@ async def receive_tcp( async def tcp( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 53, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - sock: Optional[dns.asyncbackend.StreamSocket] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + sock: dns.asyncbackend.StreamSocket | None = None, + backend: dns.asyncbackend.Backend | None = None, ) -> dns.message.Message: """Return the response obtained after sending a query via TCP. @@ -446,17 +446,17 @@ async def tcp( async def tls( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 853, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - sock: Optional[dns.asyncbackend.StreamSocket] = None, - backend: Optional[dns.asyncbackend.Backend] = None, - ssl_context: Optional[ssl.SSLContext] = None, - server_hostname: Optional[str] = None, - verify: Union[bool, str] = True, + sock: dns.asyncbackend.StreamSocket | None = None, + backend: dns.asyncbackend.Backend | None = None, + ssl_context: ssl.SSLContext | None = None, + server_hostname: str | None = None, + verify: bool | str = True, ) -> dns.message.Message: """Return the response obtained after sending a query via TLS. @@ -530,17 +530,17 @@ def _maybe_get_resolver( async def https( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 443, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, # pylint: disable=W0613 one_rr_per_rrset: bool = False, ignore_trailing: bool = False, client: Optional["httpx.AsyncClient|dns.quic.AsyncQuicConnection"] = None, path: str = "/dns-query", post: bool = True, - verify: Union[bool, str, ssl.SSLContext] = True, - bootstrap_address: Optional[str] = None, + verify: bool | str | ssl.SSLContext = True, + bootstrap_address: str | None = None, resolver: Optional["dns.asyncresolver.Resolver"] = None, # pyright: ignore family: int = socket.AF_UNSPEC, http_version: HTTPVersion = HTTPVersion.DEFAULT, @@ -707,16 +707,16 @@ async def _http3( q: dns.message.Message, where: str, url: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 443, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - verify: Union[bool, str, ssl.SSLContext] = True, - backend: Optional[dns.asyncbackend.Backend] = None, + verify: bool | str | ssl.SSLContext = True, + backend: dns.asyncbackend.Backend | None = None, post: bool = True, - connection: Optional[dns.quic.AsyncQuicConnection] = None, + connection: dns.quic.AsyncQuicConnection | None = None, ) -> dns.message.Message: if not dns.quic.have_quic: raise NoDOH("DNS-over-HTTP3 is not available.") # pragma: no cover @@ -770,17 +770,17 @@ async def _http3( async def quic( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 853, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - connection: Optional[dns.quic.AsyncQuicConnection] = None, - verify: Union[bool, str] = True, - backend: Optional[dns.asyncbackend.Backend] = None, - hostname: Optional[str] = None, - server_hostname: Optional[str] = None, + connection: dns.quic.AsyncQuicConnection | None = None, + verify: bool | str = True, + backend: dns.asyncbackend.Backend | None = None, + hostname: str | None = None, + server_hostname: str | None = None, ) -> dns.message.Message: """Return the response obtained after sending an asynchronous query via DNS-over-QUIC. @@ -841,8 +841,8 @@ async def _inbound_xfr( txn_manager: dns.transaction.TransactionManager, s: dns.asyncbackend.Socket, query: dns.message.Message, - serial: Optional[int], - timeout: Optional[float], + serial: int | None, + timeout: float | None, expiration: float, ) -> Any: """Given a socket, does the zone transfer.""" @@ -861,7 +861,7 @@ async def _inbound_xfr( with dns.xfr.Inbound(txn_manager, rdtype, serial, is_udp) as inbound: done = False tsig_ctx = None - r: Optional[dns.message.Message] = None + r: dns.message.Message | None = None while not done: (_, mexpiration) = _compute_times(timeout) if mexpiration is None or ( @@ -895,14 +895,14 @@ async def _inbound_xfr( async def inbound_xfr( where: str, txn_manager: dns.transaction.TransactionManager, - query: Optional[dns.message.Message] = None, + query: dns.message.Message | None = None, port: int = 53, - timeout: Optional[float] = None, - lifetime: Optional[float] = None, - source: Optional[str] = None, + timeout: float | None = None, + lifetime: float | None = None, + source: str | None = None, source_port: int = 0, udp_mode: UDPMode = UDPMode.NEVER, - backend: Optional[dns.asyncbackend.Backend] = None, + backend: dns.asyncbackend.Backend | None = None, ) -> None: """Conduct an inbound transfer and apply it via a transaction from the txn_manager. diff --git a/dns/asyncresolver.py b/dns/asyncresolver.py index 1df89e6c..6f8c69fd 100644 --- a/dns/asyncresolver.py +++ b/dns/asyncresolver.py @@ -19,7 +19,7 @@ import socket import time -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List import dns._ddr import dns.asyncbackend @@ -47,16 +47,16 @@ class Resolver(dns.resolver.BaseResolver): async def resolve( self, - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, - search: Optional[bool] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + lifetime: float | None = None, + search: bool | None = None, + backend: dns.asyncbackend.Backend | None = None, ) -> dns.resolver.Answer: """Query nameservers asynchronously to find the answer to the question. @@ -140,7 +140,7 @@ class Resolver(dns.resolver.BaseResolver): async def resolve_name( self, - name: Union[dns.name.Name, str], + name: dns.name.Name | str, family: int = socket.AF_UNSPEC, **kwargs: Any, ) -> dns.resolver.HostAnswers: @@ -207,7 +207,7 @@ class Resolver(dns.resolver.BaseResolver): # pylint: disable=redefined-outer-name - async def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: + async def canonical_name(self, name: dns.name.Name | str) -> dns.name.Name: """Determine the canonical name of *name*. The canonical name is the name the resolver uses for queries @@ -283,16 +283,16 @@ def reset_default_resolver() -> None: async def resolve( - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, - search: Optional[bool] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + lifetime: float | None = None, + search: bool | None = None, + backend: dns.asyncbackend.Backend | None = None, ) -> dns.resolver.Answer: """Query nameservers asynchronously to find the answer to the question. @@ -330,7 +330,7 @@ async def resolve_address( async def resolve_name( - name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any + name: dns.name.Name | str, family: int = socket.AF_UNSPEC, **kwargs: Any ) -> dns.resolver.HostAnswers: """Use a resolver to asynchronously query for address records. @@ -341,7 +341,7 @@ async def resolve_name( return await get_default_resolver().resolve_name(name, family, **kwargs) -async def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: +async def canonical_name(name: dns.name.Name | str) -> dns.name.Name: """Determine the canonical name of *name*. See :py:func:`dns.resolver.Resolver.canonical_name` for more @@ -362,11 +362,11 @@ async def try_ddr(timeout: float = 5.0) -> None: async def zone_for_name( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, tcp: bool = False, - resolver: Optional[Resolver] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + resolver: Resolver | None = None, + backend: dns.asyncbackend.Backend | None = None, ) -> dns.name.Name: """Find the name of the zone which contains the specified name. @@ -398,10 +398,10 @@ async def zone_for_name( async def make_resolver_at( - where: Union[dns.name.Name, str], + where: dns.name.Name | str, port: int = 53, family: int = socket.AF_UNSPEC, - resolver: Optional[Resolver] = None, + resolver: Resolver | None = None, ) -> Resolver: """Make a stub resolver using the specified destination as the full resolver. @@ -422,7 +422,7 @@ async def make_resolver_at( """ if resolver is None: resolver = get_default_resolver() - nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] + nameservers: List[str | dns.nameserver.Nameserver] = [] if isinstance(where, str) and dns.inet.is_address(where): nameservers.append(dns.nameserver.Do53Nameserver(where, port)) else: @@ -435,20 +435,20 @@ async def make_resolver_at( async def resolve_at( - where: Union[dns.name.Name, str], - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + where: dns.name.Name | str, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, - search: Optional[bool] = None, - backend: Optional[dns.asyncbackend.Backend] = None, + lifetime: float | None = None, + search: bool | None = None, + backend: dns.asyncbackend.Backend | None = None, port: int = 53, family: int = socket.AF_UNSPEC, - resolver: Optional[Resolver] = None, + resolver: Resolver | None = None, ) -> dns.resolver.Answer: """Query nameservers to find the answer to the question. diff --git a/dns/btree.py b/dns/btree.py index f544edd8..12da9f56 100644 --- a/dns/btree.py +++ b/dns/btree.py @@ -130,7 +130,7 @@ class _Node(Generic[KT, ET]): child = self.maybe_cow_child(i) return child._get_node(key) - def get(self, key: KT) -> Optional[ET]: + def get(self, key: KT) -> ET | None: """Get the element associated with *key* or return ``None``""" i, equal = self.search_in_node(key) if equal: @@ -158,7 +158,7 @@ class _Node(Generic[KT, ET]): if not left.try_right_steal(self, index - 1): break - def insert_nonfull(self, element: ET, in_order: bool) -> Optional[ET]: + def insert_nonfull(self, element: ET, in_order: bool) -> ET | None: assert not self.is_maximal() while True: key = element.key() @@ -292,8 +292,8 @@ class _Node(Generic[KT, ET]): left.merge(parent, index - 1) def delete( - self, key: KT, parent: Optional["_Node[KT, ET]"], exact: Optional[ET] - ) -> Optional[ET]: + self, key: KT, parent: Optional["_Node[KT, ET]"], exact: ET | None + ) -> ET | None: """Delete an element matching *key* if it exists. If *exact* is not ``None`` then it must be an exact match with that element. The Node must not be minimal unless it is the root.""" @@ -388,7 +388,7 @@ class Cursor(Generic[KT, ET]): def __init__(self, btree: "BTree[KT, ET]"): self.btree = btree - self.current_node: Optional[_Node] = None + self.current_node: _Node | None = None # The current index is the element index within the current node, or # if there is no current node then it is 0 on the left boundary and 1 # on the right boundary. @@ -397,7 +397,7 @@ class Cursor(Generic[KT, ET]): self.increasing = True self.parents: list[tuple[_Node, int]] = [] self.parked = False - self.parking_key: Optional[KT] = None + self.parking_key: KT | None = None self.parking_key_read = False def _seek_least(self) -> None: @@ -451,7 +451,7 @@ class Cursor(Generic[KT, ET]): self.parked = False self.parking_key = None - def prev(self) -> Optional[ET]: + def prev(self) -> ET | None: """Get the previous element, or return None if on the left boundary.""" self._maybe_unpark() self.parking_key = None @@ -491,7 +491,7 @@ class Cursor(Generic[KT, ET]): self.current_index = 0 return None - def next(self) -> Optional[ET]: + def next(self) -> ET | None: """Get the next element, or return None if on the right boundary.""" self._maybe_unpark() self.parking_key = None @@ -662,7 +662,7 @@ class BTree(Generic[KT, ET]): # delete_key() so that BTreeDict can be a proper MutableMapping and supply the # rest of the standard mapping API. - def insert_element(self, elt: ET, in_order: bool = False) -> Optional[ET]: + def insert_element(self, elt: ET, in_order: bool = False) -> ET | None: """Insert the element into the BTree. If *in_order* is ``True``, then extra work will be done to make left siblings @@ -685,13 +685,13 @@ class BTree(Generic[KT, ET]): self.size += 1 return oelt - def get_element(self, key: KT) -> Optional[ET]: + def get_element(self, key: KT) -> ET | None: """Get the element matching *key* from the BTree, or return ``None`` if it does not exist. """ return self.root.get(key) - def _delete(self, key: KT, exact: Optional[ET]) -> Optional[ET]: + def _delete(self, key: KT, exact: ET | None) -> ET | None: self._check_mutable_and_park() cloned = self.root.maybe_cow(self.creator) if cloned: @@ -708,14 +708,14 @@ class BTree(Generic[KT, ET]): self.root = self.root.children[0] return elt - def delete_key(self, key: KT) -> Optional[ET]: + def delete_key(self, key: KT) -> ET | None: """Delete the element matching *key* from the BTree. Returns the matching element or ``None`` if it does not exist. """ return self._delete(key, None) - def delete_exact(self, element: ET) -> Optional[ET]: + def delete_exact(self, element: ET) -> ET | None: """Delete *element* from the BTree. Returns the matching element or ``None`` if it was not in the BTree. @@ -791,7 +791,7 @@ class BTreeDict(Generic[KT, VT], BTree[KT, KV[KT, VT]], MutableMapping[KT, VT]): self, *, t: int = DEFAULT_T, - original: Optional[BTree] = None, + original: BTree | None = None, in_order: bool = False, ): super().__init__(t=t, original=original) @@ -833,7 +833,7 @@ class BTreeSet(BTree, Generic[KT], MutableSet[KT]): self, *, t: int = DEFAULT_T, - original: Optional[BTree] = None, + original: BTree | None = None, in_order: bool = False, ): super().__init__(t=t, original=original) diff --git a/dns/btreezone.py b/dns/btreezone.py index c71bd5c6..27b5bb6f 100644 --- a/dns/btreezone.py +++ b/dns/btreezone.py @@ -13,7 +13,7 @@ import enum from dataclasses import dataclass -from typing import Callable, MutableMapping, Optional, Tuple, Union, cast +from typing import Callable, MutableMapping, Tuple, cast import dns.btree import dns.immutable @@ -35,7 +35,7 @@ class NodeFlags(enum.IntFlag): class Node(dns.node.Node): __slots__ = ["flags", "id"] - def __init__(self, flags: Optional[NodeFlags] = None): + def __init__(self, flags: NodeFlags | None = None): super().__init__() if flags is None: # We allow optional flags rather than a default @@ -85,7 +85,7 @@ class ImmutableNode(Node): rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, create: bool = False, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: if create: raise TypeError("immutable") return super().get_rdataset(rdclass, rdtype, covers, False) @@ -106,9 +106,7 @@ class ImmutableNode(Node): class Delegations(dns.btree.BTreeSet[dns.name.Name]): - def get_delegation( - self, name: dns.name.Name - ) -> Tuple[Optional[dns.name.Name], bool]: + def get_delegation(self, name: dns.name.Name) -> Tuple[dns.name.Name | None, bool]: """Get the delegation applicable to *name*, if it exists. If there delegation, then return a tuple consisting of the name of @@ -245,7 +243,7 @@ class WritableVersion(dns.zone.WritableVersion): class Bounds: name: dns.name.Name left: dns.name.Name - right: Optional[dns.name.Name] + right: dns.name.Name | None closest_encloser: dns.name.Name is_equal: bool is_delegation: bool @@ -286,7 +284,7 @@ class ImmutableVersion(dns.zone.Version): self.delegations = version.delegations self.delegations.make_immutable() - def bounds(self, name: Union[dns.name.Name, str]) -> Bounds: + def bounds(self, name: dns.name.Name | str) -> Bounds: """Return the 'bounds' of *name* in its zone. The bounds information is useful when making an authoritative response, as @@ -361,9 +359,9 @@ class Zone(dns.versioned.Zone): Callable[[], MutableMapping[dns.name.Name, dns.node.Node]], dns.btree.BTreeDict[dns.name.Name, Node], ) - writable_version_factory: Optional[ - Callable[[dns.zone.Zone, bool], dns.zone.Version] - ] = WritableVersion - immutable_version_factory: Optional[ - Callable[[dns.zone.Version], dns.zone.Version] - ] = ImmutableVersion + writable_version_factory: ( + Callable[[dns.zone.Zone, bool], dns.zone.Version] | None + ) = WritableVersion + immutable_version_factory: Callable[[dns.zone.Version], dns.zone.Version] | None = ( + ImmutableVersion + ) diff --git a/dns/dnssec.py b/dns/dnssec.py index 5cb7cfcb..86a7e854 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -25,10 +25,9 @@ import hashlib import struct import time from datetime import datetime -from typing import Callable, Dict, List, Optional, Set, Tuple, Union, cast +from typing import Callable, Dict, List, Set, Tuple, Union, cast import dns._features -import dns.exception import dns.name import dns.node import dns.rdata @@ -39,12 +38,8 @@ import dns.rrset import dns.transaction import dns.zone from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash -from dns.exception import ( # pylint: disable=W0611 - AlgorithmKeyMismatch, - DeniedByPolicy, - UnsupportedAlgorithm, - ValidationFailure, -) +from dns.exception import AlgorithmKeyMismatch as AlgorithmKeyMismatch +from dns.exception import DeniedByPolicy, UnsupportedAlgorithm, ValidationFailure from dns.rdtypes.ANY.CDNSKEY import CDNSKEY from dns.rdtypes.ANY.CDS import CDS from dns.rdtypes.ANY.DNSKEY import DNSKEY @@ -84,7 +79,7 @@ def algorithm_from_text(text: str) -> Algorithm: return Algorithm.from_text(text) -def algorithm_to_text(value: Union[Algorithm, int]) -> str: +def algorithm_to_text(value: Algorithm | int) -> str: """Convert a DNSSEC algorithm value to text *value*, a ``dns.dnssec.Algorithm``. @@ -95,7 +90,7 @@ def algorithm_to_text(value: Union[Algorithm, int]) -> str: return Algorithm.to_text(value) -def to_timestamp(value: Union[datetime, str, float, int]) -> int: +def to_timestamp(value: datetime | str | float | int) -> int: """Convert various format to a timestamp""" if isinstance(value, datetime): return int(value.timestamp()) @@ -109,7 +104,7 @@ def to_timestamp(value: Union[datetime, str, float, int]) -> int: raise TypeError("Unsupported timestamp type") -def key_id(key: Union[DNSKEY, CDNSKEY]) -> int: +def key_id(key: DNSKEY | CDNSKEY) -> int: """Return the key id (a 16-bit number) for the specified key. *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` @@ -183,11 +178,11 @@ default_policy = rfc_8624_policy def make_ds( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, key: dns.rdata.Rdata, - algorithm: Union[DSDigest, str], - origin: Optional[dns.name.Name] = None, - policy: Optional[Policy] = None, + algorithm: DSDigest | str, + origin: dns.name.Name | None = None, + policy: Policy | None = None, validating: bool = False, ) -> DS: """Create a DS record for a DNSSEC key. @@ -232,8 +227,8 @@ def make_ds( check = policy.ok_to_create_ds if not check(algorithm): raise DeniedByPolicy - if not isinstance(key, (DNSKEY, CDNSKEY)): - raise ValueError("key is not a DNSKEY/CDNSKEY") + if not isinstance(key, DNSKEY | CDNSKEY): + raise ValueError("key is not a DNSKEY | CDNSKEY") if algorithm == DSDigest.SHA1: dshash = hashlib.sha1() elif algorithm == DSDigest.SHA256: @@ -260,10 +255,10 @@ def make_ds( def make_cds( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, key: dns.rdata.Rdata, - algorithm: Union[DSDigest, str], - origin: Optional[dns.name.Name] = None, + algorithm: DSDigest | str, + origin: dns.name.Name | None = None, ) -> CDS: """Create a CDS record for a DNSSEC key. @@ -296,8 +291,8 @@ def make_cds( def _find_candidate_keys( - keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG -) -> Optional[List[DNSKEY]]: + keys: Dict[dns.name.Name, dns.rdataset.Rdataset | dns.node.Node], rrsig: RRSIG +) -> List[DNSKEY] | None: value = keys.get(rrsig.signer) if isinstance(value, dns.node.Node): rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY) @@ -316,7 +311,7 @@ def _find_candidate_keys( def _get_rrname_rdataset( - rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], ) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]: if isinstance(rrset, tuple): return rrset[0], rrset[1] @@ -335,12 +330,12 @@ def _validate_signature(sig: bytes, data: bytes, key: DNSKEY) -> None: def _validate_rrsig( - rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], rrsig: RRSIG, - keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], - origin: Optional[dns.name.Name] = None, - now: Optional[float] = None, - policy: Optional[Policy] = None, + keys: Dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset], + origin: dns.name.Name | None = None, + now: float | None = None, + policy: Policy | None = None, ) -> None: """Validate an RRset against a single signature rdata, throwing an exception if validation is not successful. @@ -405,12 +400,12 @@ def _validate_rrsig( def _validate( - rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], - rrsigset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], - keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], - origin: Optional[dns.name.Name] = None, - now: Optional[float] = None, - policy: Optional[Policy] = None, + rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], + rrsigset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], + keys: Dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset], + origin: dns.name.Name | None = None, + now: float | None = None, + policy: Policy | None = None, ) -> None: """Validate an RRset against a signature RRset, throwing an exception if none of the signatures validate. @@ -478,16 +473,16 @@ def _validate( def _sign( - rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], private_key: PrivateKey, signer: dns.name.Name, dnskey: DNSKEY, - inception: Optional[Union[datetime, str, int, float]] = None, - expiration: Optional[Union[datetime, str, int, float]] = None, - lifetime: Optional[int] = None, + inception: datetime | str | int | float | None = None, + expiration: datetime | str | int | float | None = None, + lifetime: int | None = None, verify: bool = False, - policy: Optional[Policy] = None, - origin: Optional[dns.name.Name] = None, + policy: Policy | None = None, + origin: dns.name.Name | None = None, deterministic: bool = True, ) -> RRSIG: """Sign RRset using private key. @@ -605,9 +600,9 @@ def _sign( def _make_rrsig_signature_data( - rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], + rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], rrsig: RRSIG, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, ) -> bytes: """Create signature rdata. @@ -673,7 +668,7 @@ def _make_rrsig_signature_data( def _make_dnskey( public_key: PublicKey, - algorithm: Union[int, str], + algorithm: int | str, flags: int = Flag.ZONE, protocol: int = 3, ) -> DNSKEY: @@ -708,7 +703,7 @@ def _make_dnskey( def _make_cdnskey( public_key: PublicKey, - algorithm: Union[int, str], + algorithm: int | str, flags: int = Flag.ZONE, protocol: int = 3, ) -> CDNSKEY: @@ -745,10 +740,10 @@ def _make_cdnskey( def nsec3_hash( - domain: Union[dns.name.Name, str], - salt: Optional[Union[str, bytes]], + domain: dns.name.Name | str, + salt: str | bytes | None, iterations: int, - algorithm: Union[int, str], + algorithm: int | str, ) -> str: """ Calculate the NSEC3 hash, according to @@ -806,9 +801,9 @@ def nsec3_hash( def make_ds_rdataset( - rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], - algorithms: Set[Union[DSDigest, str]], - origin: Optional[dns.name.Name] = None, + rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset], + algorithms: Set[DSDigest | str], + origin: dns.name.Name | None = None, ) -> dns.rdataset.Rdataset: """Create a DS record from DNSKEY/CDNSKEY/CDS. @@ -893,10 +888,10 @@ def cds_rdataset_to_ds_rdataset( def dnskey_rdataset_to_cds_rdataset( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, rdataset: dns.rdataset.Rdataset, - algorithm: Union[DSDigest, str], - origin: Optional[dns.name.Name] = None, + algorithm: DSDigest | str, + origin: dns.name.Name | None = None, ) -> dns.rdataset.Rdataset: """Create a CDS record from DNSKEY/CDNSKEY. @@ -958,11 +953,11 @@ def default_rrset_signer( signer: dns.name.Name, ksks: List[Tuple[PrivateKey, DNSKEY]], zsks: List[Tuple[PrivateKey, DNSKEY]], - inception: Optional[Union[datetime, str, int, float]] = None, - expiration: Optional[Union[datetime, str, int, float]] = None, - lifetime: Optional[int] = None, - policy: Optional[Policy] = None, - origin: Optional[dns.name.Name] = None, + inception: datetime | str | int | float | None = None, + expiration: datetime | str | int | float | None = None, + lifetime: int | None = None, + policy: Policy | None = None, + origin: dns.name.Name | None = None, deterministic: bool = True, ) -> None: """Default RRset signer""" @@ -996,16 +991,16 @@ def default_rrset_signer( def sign_zone( zone: dns.zone.Zone, - txn: Optional[dns.transaction.Transaction] = None, - keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None, + txn: dns.transaction.Transaction | None = None, + keys: List[Tuple[PrivateKey, DNSKEY]] | None = None, add_dnskey: bool = True, - dnskey_ttl: Optional[int] = None, - inception: Optional[Union[datetime, str, int, float]] = None, - expiration: Optional[Union[datetime, str, int, float]] = None, - lifetime: Optional[int] = None, - nsec3: Optional[NSEC3PARAM] = None, - rrset_signer: Optional[RRsetSigner] = None, - policy: Optional[Policy] = None, + dnskey_ttl: int | None = None, + inception: datetime | str | int | float | None = None, + expiration: datetime | str | int | float | None = None, + lifetime: int | None = None, + nsec3: NSEC3PARAM | None = None, + rrset_signer: RRsetSigner | None = None, + policy: Policy | None = None, deterministic: bool = True, ) -> None: """Sign zone. @@ -1112,17 +1107,17 @@ def sign_zone( def _sign_zone_nsec( zone: dns.zone.Zone, txn: dns.transaction.Transaction, - rrset_signer: Optional[RRsetSigner] = None, + rrset_signer: RRsetSigner | None = None, ) -> None: """NSEC zone signer""" def _txn_add_nsec( txn: dns.transaction.Transaction, name: dns.name.Name, - next_secure: Optional[dns.name.Name], + next_secure: dns.name.Name | None, rdclass: dns.rdataclass.RdataClass, ttl: int, - rrset_signer: Optional[RRsetSigner] = None, + rrset_signer: RRsetSigner | None = None, ) -> None: """NSEC zone signer helper""" mandatory_types = set( @@ -1196,7 +1191,6 @@ def _need_pyca(*args, **kwargs): if dns._features.have("dnssec"): from cryptography.exceptions import InvalidSignature - from cryptography.hazmat.primitives.asymmetric import dsa # pylint: disable=W0611 from cryptography.hazmat.primitives.asymmetric import ec # pylint: disable=W0611 from cryptography.hazmat.primitives.asymmetric import ed448 # pylint: disable=W0611 from cryptography.hazmat.primitives.asymmetric import rsa # pylint: disable=W0611 diff --git a/dns/dnssecalgs/__init__.py b/dns/dnssecalgs/__init__.py index 780c0fdf..0810b19a 100644 --- a/dns/dnssecalgs/__init__.py +++ b/dns/dnssecalgs/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Tuple, Type, Union +from typing import Dict, Tuple, Type import dns._features import dns.name @@ -25,7 +25,7 @@ if dns._features.have("dnssec"): else: _have_cryptography = False -AlgorithmPrefix = Optional[Union[bytes, dns.name.Name]] +AlgorithmPrefix = bytes | dns.name.Name | None algorithms: Dict[Tuple[Algorithm, AlgorithmPrefix], Type[GenericPrivateKey]] = {} if _have_cryptography: @@ -48,7 +48,7 @@ if _have_cryptography: def get_algorithm_cls( - algorithm: Union[int, str], prefix: AlgorithmPrefix = None + algorithm: int | str, prefix: AlgorithmPrefix = None ) -> Type[GenericPrivateKey]: """Get Private Key class from Algorithm. @@ -86,10 +86,10 @@ def get_algorithm_cls_from_dnskey(dnskey: DNSKEY) -> Type[GenericPrivateKey]: def register_algorithm_cls( - algorithm: Union[int, str], + algorithm: int | str, algorithm_cls: Type[GenericPrivateKey], - name: Optional[Union[dns.name.Name, str]] = None, - oid: Optional[bytes] = None, + name: dns.name.Name | str | None = None, + oid: bytes | None = None, ) -> None: """Register Algorithm Private Key class. diff --git a/dns/dnssecalgs/base.py b/dns/dnssecalgs/base.py index 752ee480..0334fe6b 100644 --- a/dns/dnssecalgs/base.py +++ b/dns/dnssecalgs/base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod # pylint: disable=no-name-in-module -from typing import Any, Optional, Type +from typing import Any, Type import dns.rdataclass import dns.rdatatype @@ -80,10 +80,10 @@ class GenericPrivateKey(ABC): @classmethod @abstractmethod def from_pem( - cls, private_pem: bytes, password: Optional[bytes] = None + cls, private_pem: bytes, password: bytes | None = None ) -> "GenericPrivateKey": """Create private key from PEM-encoded PKCS#8""" @abstractmethod - def to_pem(self, password: Optional[bytes] = None) -> bytes: + def to_pem(self, password: bytes | None = None) -> bytes: """Return private key as PEM-encoded PKCS#8""" diff --git a/dns/dnssecalgs/cryptography.py b/dns/dnssecalgs/cryptography.py index 6fbeb1a4..a5dde6a4 100644 --- a/dns/dnssecalgs/cryptography.py +++ b/dns/dnssecalgs/cryptography.py @@ -1,4 +1,4 @@ -from typing import Any, Optional, Type +from typing import Any, Type from cryptography.hazmat.primitives import serialization @@ -50,12 +50,12 @@ class CryptographyPrivateKey(GenericPrivateKey): @classmethod def from_pem( - cls, private_pem: bytes, password: Optional[bytes] = None + cls, private_pem: bytes, password: bytes | None = None ) -> "GenericPrivateKey": key = serialization.load_pem_private_key(private_pem, password=password) return cls(key=key) - def to_pem(self, password: Optional[bytes] = None) -> bytes: + def to_pem(self, password: bytes | None = None) -> bytes: encryption_algorithm: serialization.KeySerializationEncryption if password: encryption_algorithm = serialization.BestAvailableEncryption(password) diff --git a/dns/e164.py b/dns/e164.py index dd9aebc8..942d2c0f 100644 --- a/dns/e164.py +++ b/dns/e164.py @@ -17,7 +17,7 @@ """DNS E.164 helpers.""" -from typing import Iterable, Optional, Union +from typing import Iterable import dns.exception import dns.name @@ -28,7 +28,7 @@ public_enum_domain = dns.name.from_text("e164.arpa.") def from_e164( - text: str, origin: Optional[dns.name.Name] = public_enum_domain + text: str, origin: dns.name.Name | None = public_enum_domain ) -> dns.name.Name: """Convert an E.164 number in textual form into a Name object whose value is the ENUM domain name for that number. @@ -51,7 +51,7 @@ def from_e164( def to_e164( name: dns.name.Name, - origin: Optional[dns.name.Name] = public_enum_domain, + origin: dns.name.Name | None = public_enum_domain, want_plus_prefix: bool = True, ) -> str: """Convert an ENUM domain name into an E.164 number. @@ -87,8 +87,8 @@ def to_e164( def query( number: str, - domains: Iterable[Union[dns.name.Name, str]], - resolver: Optional[dns.resolver.Resolver] = None, + domains: Iterable[dns.name.Name | str], + resolver: dns.resolver.Resolver | None = None, ) -> dns.resolver.Answer: """Look for NAPTR RRs for the specified number in the specified domains. diff --git a/dns/edns.py b/dns/edns.py index 3b5599c4..eb985484 100644 --- a/dns/edns.py +++ b/dns/edns.py @@ -21,7 +21,7 @@ import binascii import math import socket import struct -from typing import Any, Dict, Optional, Union +from typing import Any, Dict import dns.enum import dns.inet @@ -68,14 +68,14 @@ class OptionType(dns.enum.IntEnum): class Option: """Base class for all EDNS option types.""" - def __init__(self, otype: Union[OptionType, str]): + def __init__(self, otype: OptionType | str): """Initialize an option. *otype*, a ``dns.edns.OptionType``, is the option type. """ self.otype = OptionType.make(otype) - def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: + def to_wire(self, file: Any | None = None) -> bytes | None: """Convert an option to wire format. Returns a ``bytes`` or ``None``. @@ -166,11 +166,11 @@ class GenericOption(Option): # lgtm[py/missing-equals] implementation. """ - def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]): + def __init__(self, otype: OptionType | str, data: bytes | str): super().__init__(otype) self.data = dns.rdata.Rdata._as_bytes(data, True) - def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: + def to_wire(self, file: Any | None = None) -> bytes | None: if file: file.write(self.data) return None @@ -185,7 +185,7 @@ class GenericOption(Option): # lgtm[py/missing-equals] @classmethod def from_wire_parser( - cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" + cls, otype: OptionType | str, parser: "dns.wire.Parser" ) -> Option: return cls(otype, parser.get_remaining()) @@ -193,7 +193,7 @@ class GenericOption(Option): # lgtm[py/missing-equals] class ECSOption(Option): # lgtm[py/missing-equals] """EDNS Client Subnet (ECS, RFC7871)""" - def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0): + def __init__(self, address: str, srclen: int | None = None, scopelen: int = 0): """*address*, a ``str``, is the client address information. *srclen*, an ``int``, the source prefix length, which is the @@ -298,7 +298,7 @@ class ECSOption(Option): # lgtm[py/missing-equals] ) return ECSOption(address, srclen, scope) - def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: + def to_wire(self, file: Any | None = None) -> bytes | None: value = ( struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata ) @@ -310,7 +310,7 @@ class ECSOption(Option): # lgtm[py/missing-equals] @classmethod def from_wire_parser( - cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" + cls, otype: OptionType | str, parser: "dns.wire.Parser" ) -> Option: family, src, scope = parser.get_struct("!HBB") addrlen = int(math.ceil(src / 8.0)) @@ -366,7 +366,7 @@ class EDEOption(Option): # lgtm[py/missing-equals] _preserve_case = {"DNSKEY", "DS", "DNSSEC", "RRSIGs", "NSEC", "NXDOMAIN"} - def __init__(self, code: Union[EDECode, str], text: Optional[str] = None): + def __init__(self, code: EDECode | str, text: str | None = None): """*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the extended error. @@ -394,7 +394,7 @@ class EDEOption(Option): # lgtm[py/missing-equals] output += f": {self.text}" return output - def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: + def to_wire(self, file: Any | None = None) -> bytes | None: value = struct.pack("!H", self.code) if self.text is not None: value += self.text.encode("utf8") @@ -407,7 +407,7 @@ class EDEOption(Option): # lgtm[py/missing-equals] @classmethod def from_wire_parser( - cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" + cls, otype: OptionType | str, parser: "dns.wire.Parser" ) -> Option: code = EDECode.make(parser.get_uint16()) text = parser.get_remaining() @@ -427,7 +427,7 @@ class NSIDOption(Option): super().__init__(OptionType.NSID) self.nsid = nsid - def to_wire(self, file: Any = None) -> Optional[bytes]: + def to_wire(self, file: Any = None) -> bytes | None: if file: file.write(self.nsid) return None @@ -444,7 +444,7 @@ class NSIDOption(Option): @classmethod def from_wire_parser( - cls, otype: Union[OptionType, str], parser: dns.wire.Parser + cls, otype: OptionType | str, parser: dns.wire.Parser ) -> Option: return cls(parser.get_remaining()) @@ -459,7 +459,7 @@ class CookieOption(Option): if len(server) != 0 and (len(server) < 8 or len(server) > 32): raise ValueError("server cookie must be empty or between 8 and 32 bytes") - def to_wire(self, file: Any = None) -> Optional[bytes]: + def to_wire(self, file: Any = None) -> bytes | None: if file: file.write(self.client) if len(self.server) > 0: @@ -478,7 +478,7 @@ class CookieOption(Option): @classmethod def from_wire_parser( - cls, otype: Union[OptionType, str], parser: dns.wire.Parser + cls, otype: OptionType | str, parser: dns.wire.Parser ) -> Option: return cls(parser.get_bytes(8), parser.get_remaining()) @@ -489,7 +489,7 @@ class ReportChannelOption(Option): super().__init__(OptionType.REPORTCHANNEL) self.agent_domain = agent_domain - def to_wire(self, file: Any = None) -> Optional[bytes]: + def to_wire(self, file: Any = None) -> bytes | None: return self.agent_domain.to_wire(file) def to_text(self) -> str: @@ -497,7 +497,7 @@ class ReportChannelOption(Option): @classmethod def from_wire_parser( - cls, otype: Union[OptionType, str], parser: dns.wire.Parser + cls, otype: OptionType | str, parser: dns.wire.Parser ) -> Option: return cls(parser.get_name()) @@ -525,7 +525,7 @@ def get_option_class(otype: OptionType) -> Any: def option_from_wire_parser( - otype: Union[OptionType, str], parser: "dns.wire.Parser" + otype: OptionType | str, parser: "dns.wire.Parser" ) -> Option: """Build an EDNS option object from wire format. @@ -542,7 +542,7 @@ def option_from_wire_parser( def option_from_wire( - otype: Union[OptionType, str], wire: bytes, current: int, olen: int + otype: OptionType | str, wire: bytes, current: int, olen: int ) -> Option: """Build an EDNS option object from wire format. diff --git a/dns/entropy.py b/dns/entropy.py index 45e79e3d..64309262 100644 --- a/dns/entropy.py +++ b/dns/entropy.py @@ -20,7 +20,7 @@ import os import random import threading import time -from typing import Any, Optional, Union +from typing import Any class EntropyPool: @@ -29,9 +29,9 @@ class EntropyPool: # leaving this code doesn't hurt anything as the library code # is used if present. - def __init__(self, seed: Optional[bytes] = None): + def __init__(self, seed: bytes | None = None): self.pool_index = 0 - self.digest: Optional[bytearray] = None + self.digest: bytearray | None = None self.next_byte = 0 self.lock = threading.Lock() self.hash = hashlib.sha1() @@ -45,7 +45,7 @@ class EntropyPool: self.seeded = False self.seed_pid = 0 - def _stir(self, entropy: Union[bytes, bytearray]) -> None: + def _stir(self, entropy: bytes | bytearray) -> None: for c in entropy: if self.pool_index == self.hash_len: self.pool_index = 0 @@ -53,7 +53,7 @@ class EntropyPool: self.pool[self.pool_index] ^= b self.pool_index += 1 - def stir(self, entropy: Union[bytes, bytearray]) -> None: + def stir(self, entropy: bytes | bytearray) -> None: with self.lock: self._stir(entropy) @@ -109,7 +109,7 @@ class EntropyPool: pool = EntropyPool() -system_random: Optional[Any] +system_random: Any | None try: system_random = random.SystemRandom() except Exception: # pragma: no cover diff --git a/dns/enum.py b/dns/enum.py index 324c85da..822c9958 100644 --- a/dns/enum.py +++ b/dns/enum.py @@ -16,7 +16,7 @@ # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import enum -from typing import Any, Optional, Type, TypeVar, Union +from typing import Any, Type, TypeVar TIntEnum = TypeVar("TIntEnum", bound="IntEnum") @@ -69,7 +69,7 @@ class IntEnum(enum.IntEnum): return text @classmethod - def make(cls: Type[TIntEnum], value: Union[int, str]) -> TIntEnum: + def make(cls: Type[TIntEnum], value: int | str) -> TIntEnum: """Convert text or a value into an enumerated type, if possible. *value*, the ``int`` or ``str`` to convert. @@ -101,7 +101,7 @@ class IntEnum(enum.IntEnum): return "" @classmethod - def _extra_from_text(cls, text: str) -> Optional[Any]: # pylint: disable=W0613 + def _extra_from_text(cls, text: str) -> Any | None: # pylint: disable=W0613 return None @classmethod diff --git a/dns/exception.py b/dns/exception.py index 223f2d68..c3d42ffd 100644 --- a/dns/exception.py +++ b/dns/exception.py @@ -22,7 +22,7 @@ always be subclasses of ``DNSException``. """ -from typing import Optional, Set +from typing import Set class DNSException(Exception): @@ -48,9 +48,9 @@ class DNSException(Exception): and ``fmt`` class variables to get nice parametrized messages. """ - msg: Optional[str] = None # non-parametrized message + msg: str | None = None # non-parametrized message supp_kwargs: Set[str] = set() # accepted parameters for _fmt_kwargs (sanity check) - fmt: Optional[str] = None # message parametrized with results from _fmt_kwargs + fmt: str | None = None # message parametrized with results from _fmt_kwargs def __init__(self, *args, **kwargs): self._check_params(*args, **kwargs) @@ -92,7 +92,7 @@ class DNSException(Exception): """ fmtargs = {} for kw, data in kwargs.items(): - if isinstance(data, (list, set)): + if isinstance(data, list | set): # convert list of to list of str() fmtargs[kw] = list(map(str, data)) if len(fmtargs[kw]) == 1: diff --git a/dns/inet.py b/dns/inet.py index 4a03f996..765203b3 100644 --- a/dns/inet.py +++ b/dns/inet.py @@ -18,7 +18,7 @@ """Generic Internet address helper functions.""" import socket -from typing import Any, Optional, Tuple +from typing import Any, Tuple import dns.ipv4 import dns.ipv6 @@ -135,9 +135,7 @@ def is_address(text: str) -> bool: return False -def low_level_address_tuple( - high_tuple: Tuple[str, int], af: Optional[int] = None -) -> Any: +def low_level_address_tuple(high_tuple: Tuple[str, int], af: int | None = None) -> Any: """Given a "high-level" address tuple, i.e. an (address, port) return the appropriate "low-level" address tuple suitable for use in socket calls. diff --git a/dns/ipv4.py b/dns/ipv4.py index 78052bfd..a7161bc7 100644 --- a/dns/ipv4.py +++ b/dns/ipv4.py @@ -18,7 +18,6 @@ """IPv4 helper functions.""" import struct -from typing import Union import dns.exception @@ -36,7 +35,7 @@ def inet_ntoa(address: bytes) -> str: return f"{address[0]}.{address[1]}.{address[2]}.{address[3]}" -def inet_aton(text: Union[str, bytes]) -> bytes: +def inet_aton(text: str | bytes) -> bytes: """Convert an IPv4 address in text form to binary form. *text*, a ``str`` or ``bytes``, the IPv4 address in textual form. @@ -64,7 +63,7 @@ def inet_aton(text: Union[str, bytes]) -> bytes: raise dns.exception.SyntaxError -def canonicalize(text: Union[str, bytes]) -> str: +def canonicalize(text: str | bytes) -> str: """Verify that *address* is a valid text form IPv4 address and return its canonical text form. diff --git a/dns/ipv6.py b/dns/ipv6.py index 4f27b415..eaa0f6c7 100644 --- a/dns/ipv6.py +++ b/dns/ipv6.py @@ -19,7 +19,7 @@ import binascii import re -from typing import List, Union +from typing import List import dns.exception import dns.ipv4 @@ -101,7 +101,7 @@ _colon_colon_start = re.compile(rb"::.*") _colon_colon_end = re.compile(rb".*::$") -def inet_aton(text: Union[str, bytes], ignore_scope: bool = False) -> bytes: +def inet_aton(text: str | bytes, ignore_scope: bool = False) -> bytes: """Convert an IPv6 address in text form to binary form. *text*, a ``str`` or ``bytes``, the IPv6 address in textual form. @@ -206,7 +206,7 @@ def is_mapped(address: bytes) -> bool: return address.startswith(_mapped_prefix) -def canonicalize(text: Union[str, bytes]) -> str: +def canonicalize(text: str | bytes) -> str: """Verify that *address* is a valid text form IPv6 address and return its canonical text form. Addresses with scopes are rejected. diff --git a/dns/message.py b/dns/message.py index cd577af9..bbfccfc7 100644 --- a/dns/message.py +++ b/dns/message.py @@ -21,7 +21,7 @@ import contextlib import enum import io import time -from typing import Any, Dict, List, Optional, Tuple, Union, cast +from typing import Any, Dict, List, Tuple, cast import dns.edns import dns.entropy @@ -133,11 +133,11 @@ IndexKeyType = Tuple[ dns.name.Name, dns.rdataclass.RdataClass, dns.rdatatype.RdataType, - Optional[dns.rdatatype.RdataType], - Optional[dns.rdataclass.RdataClass], + dns.rdatatype.RdataType | None, + dns.rdataclass.RdataClass | None, ] IndexType = Dict[IndexKeyType, dns.rrset.RRset] -SectionType = Union[int, str, List[dns.rrset.RRset]] +SectionType = int | str | List[dns.rrset.RRset] class Message: @@ -145,27 +145,27 @@ class Message: _section_enum = MessageSection - def __init__(self, id: Optional[int] = None): + def __init__(self, id: int | None = None): if id is None: self.id = dns.entropy.random_16() else: self.id = id self.flags = 0 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []] - self.opt: Optional[dns.rrset.RRset] = None + self.opt: dns.rrset.RRset | None = None self.request_payload = 0 self.pad = 0 self.keyring: Any = None - self.tsig: Optional[dns.rrset.RRset] = None + self.tsig: dns.rrset.RRset | None = None self.want_tsig_sign = False self.request_mac = b"" self.xfr = False - self.origin: Optional[dns.name.Name] = None - self.tsig_ctx: Optional[Any] = None + self.origin: dns.name.Name | None = None + self.tsig_ctx: Any | None = None self.index: IndexType = {} self.errors: List[MessageError] = [] self.time = 0.0 - self.wire: Optional[bytes] = None + self.wire: bytes | None = None @property def question(self) -> List[dns.rrset.RRset]: @@ -211,7 +211,7 @@ class Message: def to_text( self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, **kw: Dict[str, Any], ) -> str: @@ -352,10 +352,10 @@ class Message: rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, - deleting: Optional[dns.rdataclass.RdataClass] = None, + deleting: dns.rdataclass.RdataClass | None = None, create: bool = False, force_unique: bool = False, - idna_codec: Optional[dns.name.IDNACodec] = None, + idna_codec: dns.name.IDNACodec | None = None, ) -> dns.rrset.RRset: """Find the RRset with the given attributes in the specified section. @@ -437,11 +437,11 @@ class Message: rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, - deleting: Optional[dns.rdataclass.RdataClass] = None, + deleting: dns.rdataclass.RdataClass | None = None, create: bool = False, force_unique: bool = False, - idna_codec: Optional[dns.name.IDNACodec] = None, - ) -> Optional[dns.rrset.RRset]: + idna_codec: dns.name.IDNACodec | None = None, + ) -> dns.rrset.RRset | None: """Get the RRset with the given attributes in the specified section. If the RRset is not found, None is returned. @@ -560,10 +560,10 @@ class Message: def to_wire( self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, max_size: int = 0, multi: bool = False, - tsig_ctx: Optional[Any] = None, + tsig_ctx: Any | None = None, prepend_length: bool = False, prefer_truncation: bool = False, **kw: Dict[str, Any], @@ -680,12 +680,12 @@ class Message: def use_tsig( self, keyring: Any, - keyname: Optional[Union[dns.name.Name, str]] = None, + keyname: dns.name.Name | str | None = None, fudge: int = 300, - original_id: Optional[int] = None, + original_id: int | None = None, tsig_error: int = 0, other_data: bytes = b"", - algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, + algorithm: dns.name.Name | str = dns.tsig.default_algorithm, ) -> None: """When sending, a TSIG signature using the specified key should be added. @@ -750,14 +750,14 @@ class Message: self.want_tsig_sign = True @property - def keyname(self) -> Optional[dns.name.Name]: + def keyname(self) -> dns.name.Name | None: if self.tsig: return self.tsig.name else: return None @property - def keyalgorithm(self) -> Optional[dns.name.Name]: + def keyalgorithm(self) -> dns.name.Name | None: if self.tsig: rdata = cast(dns.rdtypes.ANY.TSIG.TSIG, self.tsig[0]) return rdata.algorithm @@ -765,7 +765,7 @@ class Message: return None @property - def mac(self) -> Optional[bytes]: + def mac(self) -> bytes | None: if self.tsig: rdata = cast(dns.rdtypes.ANY.TSIG.TSIG, self.tsig[0]) return rdata.mac @@ -773,7 +773,7 @@ class Message: return None @property - def tsig_error(self) -> Optional[int]: + def tsig_error(self) -> int | None: if self.tsig: rdata = cast(dns.rdtypes.ANY.TSIG.TSIG, self.tsig[0]) return rdata.error @@ -791,11 +791,11 @@ class Message: def use_edns( self, - edns: Optional[Union[int, bool]] = 0, + edns: int | bool | None = 0, ednsflags: int = 0, payload: int = DEFAULT_EDNS_PAYLOAD, - request_payload: Optional[int] = None, - options: Optional[List[dns.edns.Option]] = None, + request_payload: int | None = None, + options: List[dns.edns.Option] | None = None, pad: int = 0, ) -> None: """Configure EDNS behavior. @@ -985,7 +985,7 @@ class ChainingResult: def __init__( self, canonical_name: dns.name.Name, - answer: Optional[dns.rrset.RRset], + answer: dns.rrset.RRset | None, minimum_ttl: int, cnames: List[dns.rrset.RRset], ): @@ -1305,11 +1305,11 @@ class _WireReader: def from_wire( wire: bytes, - keyring: Optional[Any] = None, - request_mac: Optional[bytes] = b"", + keyring: Any | None = None, + request_mac: bytes | None = b"", xfr: bool = False, - origin: Optional[dns.name.Name] = None, - tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None, + origin: dns.name.Name | None = None, + tsig_ctx: dns.tsig.HMACTSig | dns.tsig.GSSTSig | None = None, multi: bool = False, question_only: bool = False, one_rr_per_rrset: bool = False, @@ -1431,13 +1431,13 @@ class _TextReader: def __init__( self, text: str, - idna_codec: Optional[dns.name.IDNACodec], + idna_codec: dns.name.IDNACodec | None, one_rr_per_rrset: bool = False, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ): - self.message: Optional[Message] = None # mypy: ignore + self.message: Message | None = None # mypy: ignore self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec) self.last_name = None self.one_rr_per_rrset = one_rr_per_rrset @@ -1665,11 +1665,11 @@ class _TextReader: def from_text( text: str, - idna_codec: Optional[dns.name.IDNACodec] = None, + idna_codec: dns.name.IDNACodec | None = None, one_rr_per_rrset: bool = False, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> Message: """Convert the text format message into a message object. @@ -1713,7 +1713,7 @@ def from_text( def from_file( f: Any, - idna_codec: Optional[dns.name.IDNACodec] = None, + idna_codec: dns.name.IDNACodec | None = None, one_rr_per_rrset: bool = False, ) -> Message: """Read the next text format message from the specified file. @@ -1747,17 +1747,17 @@ def from_file( def make_query( - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, - use_edns: Optional[Union[int, bool]] = None, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, + use_edns: int | bool | None = None, want_dnssec: bool = False, - ednsflags: Optional[int] = None, - payload: Optional[int] = None, - request_payload: Optional[int] = None, - options: Optional[List[dns.edns.Option]] = None, - idna_codec: Optional[dns.name.IDNACodec] = None, - id: Optional[int] = None, + ednsflags: int | None = None, + payload: int | None = None, + request_payload: int | None = None, + options: List[dns.edns.Option] | None = None, + idna_codec: dns.name.IDNACodec | None = None, + id: int | None = None, flags: int = dns.flags.RD, pad: int = 0, ) -> QueryMessage: @@ -1861,8 +1861,8 @@ def make_response( our_payload: int = 8192, fudge: int = 300, tsig_error: int = 0, - pad: Optional[int] = None, - copy_mode: Optional[CopyMode] = None, + pad: int | None = None, + copy_mode: CopyMode | None = None, ) -> Message: """Make a message which is a response for the specified query. The message returned is really a response skeleton; it has all of the infrastructure diff --git a/dns/name.py b/dns/name.py index 7b3cf123..45c8f454 100644 --- a/dns/name.py +++ b/dns/name.py @@ -21,7 +21,7 @@ import copy import encodings.idna # type: ignore import functools import struct -from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union +from typing import Any, Callable, Dict, Iterable, Optional, Tuple import dns._features import dns.enum @@ -143,7 +143,7 @@ _escaped = b'"().;\\@$' _escaped_text = '"().;\\@$' -def _escapify(label: Union[bytes, str]) -> str: +def _escapify(label: bytes | str) -> str: """Escape the characters in label which need it. @returns: the escaped string @rtype: string""" @@ -347,7 +347,7 @@ def _validate_labels(labels: Tuple[bytes, ...]) -> None: raise EmptyLabel -def _maybe_convert_to_binary(label: Union[bytes, str]) -> bytes: +def _maybe_convert_to_binary(label: bytes | str) -> bytes: """If label is ``str``, convert it to ``bytes``. If it is already ``bytes`` just return it. @@ -371,7 +371,7 @@ class Name: __slots__ = ["labels"] - def __init__(self, labels: Iterable[Union[bytes, str]]): + def __init__(self, labels: Iterable[bytes | str]): """*labels* is any iterable whose values are ``str`` or ``bytes``.""" blabels = [_maybe_convert_to_binary(x) for x in labels] @@ -595,7 +595,7 @@ class Name: return s def to_unicode( - self, omit_final_dot: bool = False, idna_codec: Optional[IDNACodec] = None + self, omit_final_dot: bool = False, idna_codec: IDNACodec | None = None ) -> str: """Convert name to Unicode text format. @@ -649,11 +649,11 @@ class Name: def to_wire( self, - file: Optional[Any] = None, - compress: Optional[CompressType] = None, + file: Any | None = None, + compress: CompressType | None = None, origin: Optional["Name"] = None, canonicalize: bool = False, - ) -> Optional[bytes]: + ) -> bytes | None: """Convert name to wire format, possibly compressing it. *file* is the file where the name is emitted (typically an @@ -899,7 +899,7 @@ empty = Name([]) def from_unicode( - text: str, origin: Optional[Name] = root, idna_codec: Optional[IDNACodec] = None + text: str, origin: Name | None = root, idna_codec: IDNACodec | None = None ) -> Name: """Convert unicode text into a Name object. @@ -983,9 +983,9 @@ def is_all_ascii(text: str) -> bool: def from_text( - text: Union[bytes, str], - origin: Optional[Name] = root, - idna_codec: Optional[IDNACodec] = None, + text: bytes | str, + origin: Name | None = root, + idna_codec: IDNACodec | None = None, ) -> Name: """Convert text into a Name object. diff --git a/dns/nameserver.py b/dns/nameserver.py index b02a239b..c9307d3e 100644 --- a/dns/nameserver.py +++ b/dns/nameserver.py @@ -1,9 +1,7 @@ -from typing import Optional, Union from urllib.parse import urlparse import dns.asyncbackend import dns.asyncquery -import dns.inet import dns.message import dns.query @@ -31,7 +29,7 @@ class Nameserver: self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, one_rr_per_rrset: bool = False, @@ -43,7 +41,7 @@ class Nameserver: self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, backend: dns.asyncbackend.Backend, @@ -87,7 +85,7 @@ class Do53Nameserver(AddressAndPortNameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, one_rr_per_rrset: bool = False, @@ -124,7 +122,7 @@ class Do53Nameserver(AddressAndPortNameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, backend: dns.asyncbackend.Backend, @@ -165,8 +163,8 @@ class DoHNameserver(Nameserver): def __init__( self, url: str, - bootstrap_address: Optional[str] = None, - verify: Union[bool, str] = True, + bootstrap_address: str | None = None, + verify: bool | str = True, want_get: bool = False, http_version: dns.query.HTTPVersion = dns.query.HTTPVersion.DEFAULT, ): @@ -199,7 +197,7 @@ class DoHNameserver(Nameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool = False, one_rr_per_rrset: bool = False, @@ -223,7 +221,7 @@ class DoHNameserver(Nameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, backend: dns.asyncbackend.Backend, @@ -250,8 +248,8 @@ class DoTNameserver(AddressAndPortNameserver): self, address: str, port: int = 853, - hostname: Optional[str] = None, - verify: Union[bool, str] = True, + hostname: str | None = None, + verify: bool | str = True, ): super().__init__(address, port) self.hostname = hostname @@ -264,7 +262,7 @@ class DoTNameserver(AddressAndPortNameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool = False, one_rr_per_rrset: bool = False, @@ -285,7 +283,7 @@ class DoTNameserver(AddressAndPortNameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, backend: dns.asyncbackend.Backend, @@ -309,8 +307,8 @@ class DoQNameserver(AddressAndPortNameserver): self, address: str, port: int = 853, - verify: Union[bool, str] = True, - server_hostname: Optional[str] = None, + verify: bool | str = True, + server_hostname: str | None = None, ): super().__init__(address, port) self.verify = verify @@ -323,7 +321,7 @@ class DoQNameserver(AddressAndPortNameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool = False, one_rr_per_rrset: bool = False, @@ -344,7 +342,7 @@ class DoQNameserver(AddressAndPortNameserver): self, request: dns.message.QueryMessage, timeout: float, - source: Optional[str], + source: str | None, source_port: int, max_size: bool, backend: dns.asyncbackend.Backend, diff --git a/dns/node.py b/dns/node.py index de85a82d..b2cbf1b2 100644 --- a/dns/node.py +++ b/dns/node.py @@ -19,14 +19,13 @@ import enum import io -from typing import Any, Dict, Optional +from typing import Any, Dict import dns.immutable import dns.name import dns.rdataclass import dns.rdataset import dns.rdatatype -import dns.renderer import dns.rrset _cname_types = { @@ -210,7 +209,7 @@ class Node: rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, create: bool = False, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: """Get an rdataset matching the specified properties in the current node. @@ -339,7 +338,7 @@ class ImmutableNode(Node): rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, create: bool = False, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: if create: raise TypeError("immutable") return super().get_rdataset(rdclass, rdtype, covers, False) diff --git a/dns/query.py b/dns/query.py index ed0a34cf..17b1862d 100644 --- a/dns/query.py +++ b/dns/query.py @@ -28,7 +28,7 @@ import socket import struct import time import urllib.parse -from typing import Any, Callable, Dict, Optional, Tuple, Union, cast +from typing import Any, Callable, Dict, Optional, Tuple, cast import dns._features import dns._tls_util @@ -169,7 +169,7 @@ have_doh = _have_httpx def default_socket_factory( - af: Union[socket.AddressFamily, int], + af: socket.AddressFamily | int, kind: socket.SocketKind, proto: int, ) -> socket.socket: @@ -179,7 +179,7 @@ def default_socket_factory( # Function used to create a socket. Can be overridden if needed in special # situations. socket_factory: Callable[ - [Union[socket.AddressFamily, int], socket.SocketKind, int], socket.socket + [socket.AddressFamily | int, socket.SocketKind, int], socket.socket ] = default_socket_factory @@ -322,9 +322,9 @@ def _destination_and_source( def make_socket( - af: Union[socket.AddressFamily, int], + af: socket.AddressFamily | int, type: socket.SocketKind, - source: Optional[Any] = None, + source: Any | None = None, ) -> socket.socket: """Make a socket. @@ -355,11 +355,11 @@ def make_socket( def make_ssl_socket( - af: Union[socket.AddressFamily, int], + af: socket.AddressFamily | int, type: socket.SocketKind, ssl_context: ssl.SSLContext, - server_hostname: Optional[Union[dns.name.Name, str]] = None, - source: Optional[Any] = None, + server_hostname: dns.name.Name | str | None = None, + source: Any | None = None, ) -> ssl.SSLSocket: """Make a socket. @@ -443,17 +443,17 @@ class HTTPVersion(enum.IntEnum): def https( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 443, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - session: Optional[Any] = None, + session: Any | None = None, path: str = "/dns-query", post: bool = True, - bootstrap_address: Optional[str] = None, - verify: Union[bool, str, ssl.SSLContext] = True, + bootstrap_address: str | None = None, + verify: bool | str | ssl.SSLContext = True, resolver: Optional["dns.resolver.Resolver"] = None, # pyright: ignore family: int = socket.AF_UNSPEC, http_version: HTTPVersion = HTTPVersion.DEFAULT, @@ -677,15 +677,15 @@ def _http3( q: dns.message.Message, where: str, url: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 443, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - verify: Union[bool, str, ssl.SSLContext] = True, + verify: bool | str | ssl.SSLContext = True, post: bool = True, - connection: Optional[dns.quic.SyncQuicConnection] = None, + connection: dns.quic.SyncQuicConnection | None = None, ) -> dns.message.Message: if not dns.quic.have_quic: raise NoDOH("DNS-over-HTTP3 is not available.") # pragma: no cover @@ -763,9 +763,9 @@ def _udp_send(sock, data, destination, expiration): def send_udp( sock: Any, - what: Union[dns.message.Message, bytes], + what: dns.message.Message | bytes, destination: Any, - expiration: Optional[float] = None, + expiration: float | None = None, ) -> Tuple[int, float]: """Send a DNS message to the specified UDP socket. @@ -792,16 +792,16 @@ def send_udp( def receive_udp( sock: Any, - destination: Optional[Any] = None, - expiration: Optional[float] = None, + destination: Any | None = None, + expiration: float | None = None, ignore_unexpected: bool = False, one_rr_per_rrset: bool = False, - keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None, - request_mac: Optional[bytes] = b"", + keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None, + request_mac: bytes | None = b"", ignore_trailing: bool = False, raise_on_truncation: bool = False, ignore_errors: bool = False, - query: Optional[dns.message.Message] = None, + query: dns.message.Message | None = None, ) -> Any: """Read a DNS message from a UDP socket. @@ -899,15 +899,15 @@ def receive_udp( def udp( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 53, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, ignore_unexpected: bool = False, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, raise_on_truncation: bool = False, - sock: Optional[Any] = None, + sock: Any | None = None, ignore_errors: bool = False, ) -> dns.message.Message: """Return the response obtained after sending a query via UDP. @@ -991,15 +991,15 @@ def udp( def udp_with_fallback( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 53, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, ignore_unexpected: bool = False, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - udp_sock: Optional[Any] = None, - tcp_sock: Optional[Any] = None, + udp_sock: Any | None = None, + tcp_sock: Any | None = None, ignore_errors: bool = False, ) -> Tuple[dns.message.Message, bool]: """Return the response to the query, trying UDP first and falling back @@ -1115,8 +1115,8 @@ def _net_write(sock, data, expiration): def send_tcp( sock: Any, - what: Union[dns.message.Message, bytes], - expiration: Optional[float] = None, + what: dns.message.Message | bytes, + expiration: float | None = None, ) -> Tuple[int, float]: """Send a DNS message to the specified TCP socket. @@ -1145,10 +1145,10 @@ def send_tcp( def receive_tcp( sock: Any, - expiration: Optional[float] = None, + expiration: float | None = None, one_rr_per_rrset: bool = False, - keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None, - request_mac: Optional[bytes] = b"", + keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None, + request_mac: bytes | None = b"", ignore_trailing: bool = False, ) -> Tuple[dns.message.Message, float]: """Read a DNS message from a TCP socket. @@ -1204,13 +1204,13 @@ def _connect(s, address, expiration): def tcp( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 53, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - sock: Optional[Any] = None, + sock: Any | None = None, ) -> dns.message.Message: """Return the response obtained after sending a query via TCP. @@ -1283,9 +1283,9 @@ def _tls_handshake(s, expiration): def make_ssl_context( - verify: Union[bool, str] = True, + verify: bool | str = True, check_hostname: bool = True, - alpns: Optional[list[str]] = None, + alpns: list[str] | None = None, ) -> ssl.SSLContext: """Make an SSL context @@ -1317,7 +1317,7 @@ def make_ssl_context( # for backwards compatibility def _make_dot_ssl_context( - server_hostname: Optional[str], verify: Union[bool, str] + server_hostname: str | None, verify: bool | str ) -> ssl.SSLContext: return make_ssl_context(verify, server_hostname is not None, ["dot"]) @@ -1325,16 +1325,16 @@ def _make_dot_ssl_context( def tls( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 853, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - sock: Optional[ssl.SSLSocket] = None, - ssl_context: Optional[ssl.SSLContext] = None, - server_hostname: Optional[str] = None, - verify: Union[bool, str] = True, + sock: ssl.SSLSocket | None = None, + ssl_context: ssl.SSLContext | None = None, + server_hostname: str | None = None, + verify: bool | str = True, ) -> dns.message.Message: """Return the response obtained after sending a query via TLS. @@ -1433,16 +1433,16 @@ def tls( def quic( q: dns.message.Message, where: str, - timeout: Optional[float] = None, + timeout: float | None = None, port: int = 853, - source: Optional[str] = None, + source: str | None = None, source_port: int = 0, one_rr_per_rrset: bool = False, ignore_trailing: bool = False, - connection: Optional[dns.quic.SyncQuicConnection] = None, - verify: Union[bool, str] = True, - hostname: Optional[str] = None, - server_hostname: Optional[str] = None, + connection: dns.quic.SyncQuicConnection | None = None, + verify: bool | str = True, + hostname: str | None = None, + server_hostname: str | None = None, ) -> dns.message.Message: """Return the response obtained after sending a query via DNS-over-QUIC. @@ -1542,11 +1542,11 @@ class UDPMode(enum.IntEnum): def _inbound_xfr( txn_manager: dns.transaction.TransactionManager, - s: Union[socket.socket, ssl.SSLSocket], + s: socket.socket | ssl.SSLSocket, query: dns.message.Message, - serial: Optional[int], - timeout: Optional[float], - expiration: Optional[float], + serial: int | None, + timeout: float | None, + expiration: float | None, ) -> Any: """Given a socket, does the zone transfer.""" rdtype = query.question[0].rdtype @@ -1562,7 +1562,7 @@ def _inbound_xfr( with dns.xfr.Inbound(txn_manager, rdtype, serial, is_udp) as inbound: done = False tsig_ctx = None - r: Optional[dns.message.Message] = None + r: dns.message.Message | None = None while not done: (_, mexpiration) = _compute_times(timeout) if mexpiration is None or ( @@ -1594,20 +1594,20 @@ def _inbound_xfr( def xfr( where: str, - zone: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.AXFR, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, - timeout: Optional[float] = None, + zone: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.AXFR, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, + timeout: float | None = None, port: int = 53, - keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None, - keyname: Optional[Union[dns.name.Name, str]] = None, + keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None, + keyname: dns.name.Name | str | None = None, relativize: bool = True, - lifetime: Optional[float] = None, - source: Optional[str] = None, + lifetime: float | None = None, + source: str | None = None, source_port: int = 0, serial: int = 0, use_udp: bool = False, - keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, + keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm, ) -> Any: """Return a generator for the responses to a zone transfer. @@ -1713,11 +1713,11 @@ def xfr( def inbound_xfr( where: str, txn_manager: dns.transaction.TransactionManager, - query: Optional[dns.message.Message] = None, + query: dns.message.Message | None = None, port: int = 53, - timeout: Optional[float] = None, - lifetime: Optional[float] = None, - source: Optional[str] = None, + timeout: float | None = None, + lifetime: float | None = None, + source: str | None = None, source_port: int = 0, udp_mode: UDPMode = UDPMode.NEVER, ) -> None: diff --git a/dns/quic/__init__.py b/dns/quic/__init__.py index e371c417..9bce6346 100644 --- a/dns/quic/__init__.py +++ b/dns/quic/__init__.py @@ -6,19 +6,15 @@ import dns._features import dns.asyncbackend if dns._features.have("doq"): - import aioquic.quic.configuration # type: ignore - from dns._asyncbackend import NullContext - from dns.quic._asyncio import ( - AsyncioQuicConnection, - AsyncioQuicManager, - AsyncioQuicStream, - ) + from dns.quic._asyncio import AsyncioQuicConnection as AsyncioQuicConnection + from dns.quic._asyncio import AsyncioQuicManager + from dns.quic._asyncio import AsyncioQuicStream as AsyncioQuicStream from dns.quic._common import AsyncQuicConnection # pyright: ignore - from dns.quic._common import AsyncQuicManager + from dns.quic._common import AsyncQuicManager as AsyncQuicManager from dns.quic._sync import SyncQuicConnection # pyright: ignore from dns.quic._sync import SyncQuicStream # pyright: ignore - from dns.quic._sync import SyncQuicManager + from dns.quic._sync import SyncQuicManager as SyncQuicManager have_quic = True @@ -43,11 +39,11 @@ if dns._features.have("doq"): if dns._features.have("trio"): import trio - from dns.quic._trio import ( # pylint: disable=ungrouped-imports - TrioQuicConnection, - TrioQuicManager, - TrioQuicStream, + from dns.quic._trio import ( + TrioQuicConnection as TrioQuicConnection, # pylint: disable=ungrouped-imports ) + from dns.quic._trio import TrioQuicManager + from dns.quic._trio import TrioQuicStream as TrioQuicStream def _trio_context_factory(): return trio.open_nursery() diff --git a/dns/quic/_common.py b/dns/quic/_common.py index d21ceea6..ba9d2454 100644 --- a/dns/quic/_common.py +++ b/dns/quic/_common.py @@ -7,10 +7,9 @@ import socket import struct import time import urllib.parse -from typing import Any, Optional +from typing import Any import aioquic.h3.connection # type: ignore -import aioquic.h3.events # type: ignore import aioquic.quic.configuration # type: ignore import aioquic.quic.connection # type: ignore @@ -218,7 +217,7 @@ class BaseQuicConnection: class AsyncQuicConnection(BaseQuicConnection): - async def make_stream(self, timeout: Optional[float] = None) -> Any: + async def make_stream(self, timeout: float | None = None) -> Any: pass diff --git a/dns/rdata.py b/dns/rdata.py index 4f4c6205..c4522e68 100644 --- a/dns/rdata.py +++ b/dns/rdata.py @@ -25,7 +25,7 @@ import ipaddress import itertools import random from importlib import import_module -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Tuple import dns.exception import dns.immutable @@ -202,7 +202,7 @@ class Rdata: def to_text( self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, **kw: Dict[str, Any], ) -> str: @@ -216,19 +216,19 @@ class Rdata: def _to_wire( self, file: Any, - compress: Optional[dns.name.CompressType] = None, - origin: Optional[dns.name.Name] = None, + compress: dns.name.CompressType | None = None, + origin: dns.name.Name | None = None, canonicalize: bool = False, ) -> None: raise NotImplementedError # pragma: no cover def to_wire( self, - file: Optional[Any] = None, - compress: Optional[dns.name.CompressType] = None, - origin: Optional[dns.name.Name] = None, + file: Any | None = None, + compress: dns.name.CompressType | None = None, + origin: dns.name.Name | None = None, canonicalize: bool = False, - ) -> Optional[bytes]: + ) -> bytes | None: """Convert an rdata to wire format. Returns a ``bytes`` if no output file was specified, or ``None`` otherwise. @@ -246,7 +246,7 @@ class Rdata: self._to_wire(f, compress, origin, canonicalize) return f.getvalue() - def to_generic(self, origin: Optional[dns.name.Name] = None) -> "GenericRdata": + def to_generic(self, origin: dns.name.Name | None = None) -> "GenericRdata": """Creates a dns.rdata.GenericRdata equivalent of this rdata. Returns a ``dns.rdata.GenericRdata``. @@ -255,7 +255,7 @@ class Rdata: assert wire is not None # for type checkers return GenericRdata(self.rdclass, self.rdtype, wire) - def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes: + def to_digestable(self, origin: dns.name.Name | None = None) -> bytes: """Convert rdata to a format suitable for digesting in hashes. This is also the DNSSEC canonical form. @@ -408,9 +408,9 @@ class Rdata: rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, tok: dns.tokenizer.Tokenizer, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> "Rdata": raise NotImplementedError # pragma: no cover @@ -420,7 +420,7 @@ class Rdata: rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, parser: dns.wire.Parser, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, ) -> "Rdata": raise NotImplementedError # pragma: no cover @@ -482,7 +482,7 @@ class Rdata: cls, value: Any, encode: bool = False, - max_length: Optional[int] = None, + max_length: int | None = None, empty_ok: bool = True, ) -> bytes: if encode and isinstance(value, str): @@ -631,7 +631,7 @@ class GenericRdata(Rdata): def to_text( self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, **kw: Dict[str, Any], ) -> str: @@ -654,7 +654,7 @@ class GenericRdata(Rdata): def _to_wire(self, file, compress=None, origin=None, canonicalize=False): file.write(self.data) - def to_generic(self, origin: Optional[dns.name.Name] = None) -> "GenericRdata": + def to_generic(self, origin: dns.name.Name | None = None) -> "GenericRdata": return self @classmethod @@ -722,13 +722,13 @@ def load_all_types(disable_dynamic_load=True): def from_text( - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], - tok: Union[dns.tokenizer.Tokenizer, str], - origin: Optional[dns.name.Name] = None, + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, + tok: dns.tokenizer.Tokenizer | str, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, - idna_codec: Optional[dns.name.IDNACodec] = None, + relativize_to: dns.name.Name | None = None, + idna_codec: dns.name.IDNACodec | None = None, ) -> Rdata: """Build an rdata object from text format. @@ -815,10 +815,10 @@ def from_text( def from_wire_parser( - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, parser: dns.wire.Parser, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, ) -> Rdata: """Build an rdata object from wire format @@ -852,12 +852,12 @@ def from_wire_parser( def from_wire( - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, wire: bytes, current: int, rdlen: int, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, ) -> Rdata: """Build an rdata object from wire format diff --git a/dns/rdataset.py b/dns/rdataset.py index 006bedbb..1edf67d7 100644 --- a/dns/rdataset.py +++ b/dns/rdataset.py @@ -20,7 +20,7 @@ import io import random import struct -from typing import Any, Collection, Dict, List, Optional, Union, cast +from typing import Any, Collection, Dict, List, cast import dns.exception import dns.immutable @@ -99,7 +99,7 @@ class Rdataset(dns.set.Set): # pylint: disable=arguments-differ,arguments-renamed def add( # pyright: ignore - self, rd: dns.rdata.Rdata, ttl: Optional[int] = None + self, rd: dns.rdata.Rdata, ttl: int | None = None ) -> None: """Add the specified rdata to the rdataset. @@ -198,10 +198,10 @@ class Rdataset(dns.set.Set): def to_text( self, - name: Optional[dns.name.Name] = None, - origin: Optional[dns.name.Name] = None, + name: dns.name.Name | None = None, + origin: dns.name.Name | None = None, relativize: bool = True, - override_rdclass: Optional[dns.rdataclass.RdataClass] = None, + override_rdclass: dns.rdataclass.RdataClass | None = None, want_comments: bool = False, **kw: Dict[str, Any], ) -> str: @@ -274,9 +274,9 @@ class Rdataset(dns.set.Set): self, name: dns.name.Name, file: Any, - compress: Optional[dns.name.CompressType] = None, - origin: Optional[dns.name.Name] = None, - override_rdclass: Optional[dns.rdataclass.RdataClass] = None, + compress: dns.name.CompressType | None = None, + origin: dns.name.Name | None = None, + override_rdclass: dns.rdataclass.RdataClass | None = None, want_shuffle: bool = True, ) -> int: """Convert the rdataset to wire format. @@ -313,7 +313,7 @@ class Rdataset(dns.set.Set): file.write(struct.pack("!HHIH", self.rdtype, rdclass, 0, 0)) return 1 else: - l: Union[Rdataset, List[dns.rdata.Rdata]] + l: Rdataset | List[dns.rdata.Rdata] if want_shuffle: l = list(self) random.shuffle(l) @@ -425,14 +425,14 @@ class ImmutableRdataset(Rdataset): # lgtm[py/missing-equals] def from_text_list( - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, ttl: int, text_rdatas: Collection[str], - idna_codec: Optional[dns.name.IDNACodec] = None, - origin: Optional[dns.name.Name] = None, + idna_codec: dns.name.IDNACodec | None = None, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> Rdataset: """Create an rdataset with the specified class, type, and TTL, and with the specified list of rdatas in text format. @@ -465,8 +465,8 @@ def from_text_list( def from_text( - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, ttl: int, *text_rdatas: Any, ) -> Rdataset: diff --git a/dns/rdtypes/txtbase.py b/dns/rdtypes/txtbase.py index 6ecdd35f..5e5b24f5 100644 --- a/dns/rdtypes/txtbase.py +++ b/dns/rdtypes/txtbase.py @@ -17,7 +17,7 @@ """TXT-like base class.""" -from typing import Any, Dict, Iterable, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Tuple import dns.exception import dns.immutable @@ -39,7 +39,7 @@ class TXTBase(dns.rdata.Rdata): self, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, - strings: Iterable[Union[bytes, str]], + strings: Iterable[bytes | str], ): """Initialize a TXT-like rdata. @@ -58,7 +58,7 @@ class TXTBase(dns.rdata.Rdata): def to_text( self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, **kw: Dict[str, Any], ) -> str: @@ -75,9 +75,9 @@ class TXTBase(dns.rdata.Rdata): rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, tok: dns.tokenizer.Tokenizer, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> dns.rdata.Rdata: strings = [] for token in tok.get_remaining(): diff --git a/dns/rdtypes/util.py b/dns/rdtypes/util.py index ee6e8aca..c17b154b 100644 --- a/dns/rdtypes/util.py +++ b/dns/rdtypes/util.py @@ -18,7 +18,7 @@ import collections import random import struct -from typing import Any, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable, List, Tuple import dns.exception import dns.ipv4 @@ -35,7 +35,7 @@ class Gateway: name = "" - def __init__(self, type: Any, gateway: Optional[Union[str, dns.name.Name]] = None): + def __init__(self, type: Any, gateway: str | dns.name.Name | None = None): self.type = dns.rdata.Rdata._as_uint8(type) self.gateway = gateway self._check() @@ -126,7 +126,7 @@ class Bitmap: type_name = "" - def __init__(self, windows: Optional[Iterable[Tuple[int, bytes]]] = None): + def __init__(self, windows: Iterable[Tuple[int, bytes]] | None = None): last_window = -1 if windows is None: windows = [] diff --git a/dns/resolver.py b/dns/resolver.py index 29133635..923bb4b4 100644 --- a/dns/resolver.py +++ b/dns/resolver.py @@ -24,7 +24,7 @@ import sys import threading import time import warnings -from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast +from typing import Any, Dict, Iterator, List, Sequence, Tuple, cast from urllib.parse import urlparse import dns._ddr @@ -65,7 +65,7 @@ class NXDOMAIN(dns.exception.DNSException): super().__init__(*args, **kwargs) def _check_kwargs(self, qnames, responses=None): # pyright: ignore - if not isinstance(qnames, (list, tuple, set)): + if not isinstance(qnames, list | tuple | set): raise AttributeError("qnames must be a list, tuple or set") if len(qnames) == 0: raise AttributeError("qnames must contain at least one element") @@ -144,11 +144,11 @@ class YXDOMAIN(dns.exception.DNSException): ErrorTuple = Tuple[ - Optional[str], + str | None, bool, int, - Union[Exception, str], - Optional[dns.message.Message], + Exception | str, + dns.message.Message | None, ] @@ -266,8 +266,8 @@ class Answer: rdtype: dns.rdatatype.RdataType, rdclass: dns.rdataclass.RdataClass, response: dns.message.QueryMessage, - nameserver: Optional[str] = None, - port: Optional[int] = None, + nameserver: str | None = None, + port: int | None = None, ) -> None: self.qname = qname self.rdtype = rdtype @@ -330,8 +330,8 @@ class HostAnswers(Answers): @classmethod def make( cls, - v6: Optional[Answer] = None, - v4: Optional[Answer] = None, + v6: Answer | None = None, + v4: Answer | None = None, add_empty: bool = True, ) -> "HostAnswers": answers = HostAnswers() @@ -449,7 +449,7 @@ class Cache(CacheBase): now = time.time() self.next_cleaning = now + self.cleaning_interval - def get(self, key: CacheKey) -> Optional[Answer]: + def get(self, key: CacheKey) -> Answer | None: """Get the answer associated with *key*. Returns None if no answer is cached for the key. @@ -482,7 +482,7 @@ class Cache(CacheBase): self._maybe_clean() self.data[key] = value - def flush(self, key: Optional[CacheKey] = None) -> None: + def flush(self, key: CacheKey | None = None) -> None: """Flush the cache. If *key* is not ``None``, only that item is flushed. Otherwise the entire cache @@ -549,7 +549,7 @@ class LRUCache(CacheBase): max_size = 1 self.max_size = max_size - def get(self, key: CacheKey) -> Optional[Answer]: + def get(self, key: CacheKey) -> Answer | None: """Get the answer associated with *key*. Returns None if no answer is cached for the key. @@ -608,7 +608,7 @@ class LRUCache(CacheBase): node.link_after(self.sentinel) self.data[key] = node - def flush(self, key: Optional[CacheKey] = None) -> None: + def flush(self, key: CacheKey | None = None) -> None: """Flush the cache. If *key* is not ``None``, only that item is flushed. Otherwise the entire cache @@ -648,12 +648,12 @@ class _Resolution: def __init__( self, resolver: "BaseResolver", - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - rdclass: Union[dns.rdataclass.RdataClass, str], + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + rdclass: dns.rdataclass.RdataClass | str, tcp: bool, raise_on_no_answer: bool, - search: Optional[bool], + search: bool | None, ) -> None: if isinstance(qname, str): qname = dns.name.from_text(qname, None) @@ -676,15 +676,15 @@ class _Resolution: self.nameservers: List[dns.nameserver.Nameserver] = [] self.current_nameservers: List[dns.nameserver.Nameserver] = [] self.errors: List[ErrorTuple] = [] - self.nameserver: Optional[dns.nameserver.Nameserver] = None + self.nameserver: dns.nameserver.Nameserver | None = None self.tcp_attempt = False self.retry_with_tcp = False - self.request: Optional[dns.message.QueryMessage] = None + self.request: dns.message.QueryMessage | None = None self.backoff = 0.0 def next_request( self, - ) -> Tuple[Optional[dns.message.QueryMessage], Optional[Answer]]: + ) -> Tuple[dns.message.QueryMessage | None, Answer | None]: """Get the next request to send, and check the cache. Returns a (request, answer) tuple. At most one of request or @@ -779,8 +779,8 @@ class _Resolution: return (self.nameserver, self.tcp_attempt, backoff) def query_result( - self, response: Optional[dns.message.Message], ex: Optional[Exception] - ) -> Tuple[Optional[Answer], bool]: + self, response: dns.message.Message | None, ex: Exception | None + ) -> Tuple[Answer | None, bool]: # # returns an (answer: Answer, end_loop: bool) tuple. # @@ -917,19 +917,19 @@ class BaseResolver: use_search_by_default: bool timeout: float lifetime: float - keyring: Optional[Any] - keyname: Optional[Union[dns.name.Name, str]] - keyalgorithm: Union[dns.name.Name, str] + keyring: Any | None + keyname: dns.name.Name | str | None + keyalgorithm: dns.name.Name | str edns: int ednsflags: int - ednsoptions: Optional[List[dns.edns.Option]] + ednsoptions: List[dns.edns.Option] | None payload: int cache: Any - flags: Optional[int] + flags: int | None retry_servfail: bool rotate: bool - ndots: Optional[int] - _nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] + ndots: int | None + _nameservers: Sequence[str | dns.nameserver.Nameserver] def __init__( self, filename: str = "/etc/resolv.conf", configure: bool = True @@ -1063,8 +1063,8 @@ class BaseResolver: def _compute_timeout( self, start: float, - lifetime: Optional[float] = None, - errors: Optional[List[ErrorTuple]] = None, + lifetime: float | None = None, + errors: List[ErrorTuple] | None = None, ) -> float: lifetime = self.lifetime if lifetime is None else lifetime now = time.time() @@ -1085,7 +1085,7 @@ class BaseResolver: return min(lifetime - duration, self.timeout) def _get_qnames_to_try( - self, qname: dns.name.Name, search: Optional[bool] + self, qname: dns.name.Name, search: bool | None ) -> List[dns.name.Name]: # This is a separate method so we can unit test the search # rules without requiring the Internet. @@ -1128,8 +1128,8 @@ class BaseResolver: def use_tsig( self, keyring: Any, - keyname: Optional[Union[dns.name.Name, str]] = None, - algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, + keyname: dns.name.Name | str | None = None, + algorithm: dns.name.Name | str = dns.tsig.default_algorithm, ) -> None: """Add a TSIG signature to each query. @@ -1143,10 +1143,10 @@ class BaseResolver: def use_edns( self, - edns: Optional[Union[int, bool]] = 0, + edns: int | bool | None = 0, ednsflags: int = 0, payload: int = dns.message.DEFAULT_EDNS_PAYLOAD, - options: Optional[List[dns.edns.Option]] = None, + options: List[dns.edns.Option] | None = None, ) -> None: """Configure EDNS behavior. @@ -1185,12 +1185,12 @@ class BaseResolver: @classmethod def _enrich_nameservers( cls, - nameservers: Sequence[Union[str, dns.nameserver.Nameserver]], + nameservers: Sequence[str | dns.nameserver.Nameserver], nameserver_ports: Dict[str, int], default_port: int, ) -> List[dns.nameserver.Nameserver]: enriched_nameservers = [] - if isinstance(nameservers, (list, tuple)): + if isinstance(nameservers, list | tuple): for nameserver in nameservers: enriched_nameserver: dns.nameserver.Nameserver if isinstance(nameserver, dns.nameserver.Nameserver): @@ -1221,12 +1221,12 @@ class BaseResolver: @property def nameservers( self, - ) -> Sequence[Union[str, dns.nameserver.Nameserver]]: + ) -> Sequence[str | dns.nameserver.Nameserver]: return self._nameservers @nameservers.setter def nameservers( - self, nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] + self, nameservers: Sequence[str | dns.nameserver.Nameserver] ) -> None: """ *nameservers*, a ``list`` or ``tuple`` of nameservers, where a nameserver is either @@ -1245,15 +1245,15 @@ class Resolver(BaseResolver): def resolve( self, - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, - search: Optional[bool] = None, + lifetime: float | None = None, + search: bool | None = None, ) -> Answer: # pylint: disable=arguments-differ """Query nameservers to find the answer to the question. @@ -1347,14 +1347,14 @@ class Resolver(BaseResolver): def query( self, - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, + lifetime: float | None = None, ) -> Answer: # pragma: no cover """Query nameservers to find the answer to the question. @@ -1406,7 +1406,7 @@ class Resolver(BaseResolver): def resolve_name( self, - name: Union[dns.name.Name, str], + name: dns.name.Name | str, family: int = socket.AF_UNSPEC, **kwargs: Any, ) -> HostAnswers: @@ -1471,7 +1471,7 @@ class Resolver(BaseResolver): # pylint: disable=redefined-outer-name - def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: + def canonical_name(self, name: dns.name.Name | str) -> dns.name.Name: """Determine the canonical name of *name*. The canonical name is the name the resolver uses for queries @@ -1527,7 +1527,7 @@ class Resolver(BaseResolver): #: The default resolver. -default_resolver: Optional[Resolver] = None +default_resolver: Resolver | None = None def get_default_resolver() -> Resolver: @@ -1550,15 +1550,15 @@ def reset_default_resolver() -> None: def resolve( - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, - search: Optional[bool] = None, + lifetime: float | None = None, + search: bool | None = None, ) -> Answer: # pragma: no cover """Query nameservers to find the answer to the question. @@ -1583,14 +1583,14 @@ def resolve( def query( - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, + lifetime: float | None = None, ) -> Answer: # pragma: no cover """Query nameservers to find the answer to the question. @@ -1626,7 +1626,7 @@ def resolve_address(ipaddr: str, *args: Any, **kwargs: Any) -> Answer: def resolve_name( - name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any + name: dns.name.Name | str, family: int = socket.AF_UNSPEC, **kwargs: Any ) -> HostAnswers: """Use a resolver to query for address records. @@ -1637,7 +1637,7 @@ def resolve_name( return get_default_resolver().resolve_name(name, family, **kwargs) -def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: +def canonical_name(name: dns.name.Name | str) -> dns.name.Name: """Determine the canonical name of *name*. See ``dns.resolver.Resolver.canonical_name`` for more information on the @@ -1658,11 +1658,11 @@ def try_ddr(lifetime: float = 5.0) -> None: # pragma: no cover def zone_for_name( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, tcp: bool = False, - resolver: Optional[Resolver] = None, - lifetime: Optional[float] = None, + resolver: Resolver | None = None, + lifetime: float | None = None, ) -> dns.name.Name: # pyright: ignore[reportReturnType] """Find the name of the zone which contains the specified name. @@ -1696,14 +1696,14 @@ def zone_for_name( if not name.is_absolute(): raise NotAbsolute(name) start = time.time() - expiration: Optional[float] + expiration: float | None if lifetime is not None: expiration = start + lifetime else: expiration = None while 1: try: - rlifetime: Optional[float] + rlifetime: float | None if expiration is not None: rlifetime = expiration - time.time() if rlifetime <= 0: @@ -1743,10 +1743,10 @@ def zone_for_name( def make_resolver_at( - where: Union[dns.name.Name, str], + where: dns.name.Name | str, port: int = 53, family: int = socket.AF_UNSPEC, - resolver: Optional[Resolver] = None, + resolver: Resolver | None = None, ) -> Resolver: """Make a stub resolver using the specified destination as the full resolver. @@ -1767,7 +1767,7 @@ def make_resolver_at( """ if resolver is None: resolver = get_default_resolver() - nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] + nameservers: List[str | dns.nameserver.Nameserver] = [] if isinstance(where, str) and dns.inet.is_address(where): nameservers.append(dns.nameserver.Do53Nameserver(where, port)) else: @@ -1779,19 +1779,19 @@ def make_resolver_at( def resolve_at( - where: Union[dns.name.Name, str], - qname: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, - rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, + where: dns.name.Name | str, + qname: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A, + rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, tcp: bool = False, - source: Optional[str] = None, + source: str | None = None, raise_on_no_answer: bool = True, source_port: int = 0, - lifetime: Optional[float] = None, - search: Optional[bool] = None, + lifetime: float | None = None, + search: bool | None = None, port: int = 53, family: int = socket.AF_UNSPEC, - resolver: Optional[Resolver] = None, + resolver: Resolver | None = None, ) -> Answer: """Query nameservers to find the answer to the question. @@ -1829,7 +1829,7 @@ _protocols_for_socktype: Dict[Any, List[Any]] = { socket.SOCK_STREAM: [socket.SOL_TCP], } -_resolver: Optional[Resolver] = None +_resolver: Resolver | None = None _original_getaddrinfo = socket.getaddrinfo _original_getnameinfo = socket.getnameinfo _original_getfqdn = socket.getfqdn @@ -2029,7 +2029,7 @@ def _gethostbyaddr(ip): return (canonical, aliases, addresses) -def override_system_resolver(resolver: Optional[Resolver] = None) -> None: +def override_system_resolver(resolver: Resolver | None = None) -> None: """Override the system resolver routines in the socket module with versions which use dnspython's resolver. diff --git a/dns/rrset.py b/dns/rrset.py index 2b0effaa..271ddbe8 100644 --- a/dns/rrset.py +++ b/dns/rrset.py @@ -17,7 +17,7 @@ """DNS RRsets (an RRset is a named rdataset)""" -from typing import Any, Collection, Dict, Optional, Union, cast +from typing import Any, Collection, Dict, cast import dns.name import dns.rdata @@ -45,7 +45,7 @@ class RRset(dns.rdataset.Rdataset): rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, - deleting: Optional[dns.rdataclass.RdataClass] = None, + deleting: dns.rdataclass.RdataClass | None = None, ): """Create a new RRset.""" @@ -116,7 +116,7 @@ class RRset(dns.rdataset.Rdataset): rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType, - deleting: Optional[dns.rdataclass.RdataClass] = None, + deleting: dns.rdataclass.RdataClass | None = None, ) -> bool: """Returns ``True`` if this rrset matches the specified name, class, type, covers, and deletion state. @@ -131,7 +131,7 @@ class RRset(dns.rdataset.Rdataset): def to_text( # type: ignore[override] self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = True, **kw: Dict[str, Any], ) -> str: @@ -158,8 +158,8 @@ class RRset(dns.rdataset.Rdataset): def to_wire( # type: ignore[override] self, file: Any, - compress: Optional[dns.name.CompressType] = None, # type: ignore - origin: Optional[dns.name.Name] = None, + compress: dns.name.CompressType | None = None, # type: ignore + origin: dns.name.Name | None = None, **kw: Dict[str, Any], ) -> int: """Convert the RRset to wire format. @@ -185,15 +185,15 @@ class RRset(dns.rdataset.Rdataset): def from_text_list( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, ttl: int, - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, text_rdatas: Collection[str], - idna_codec: Optional[dns.name.IDNACodec] = None, - origin: Optional[dns.name.Name] = None, + idna_codec: dns.name.IDNACodec | None = None, + origin: dns.name.Name | None = None, relativize: bool = True, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> RRset: """Create an RRset with the specified name, TTL, class, and type, and with the specified list of rdatas in text format. @@ -228,10 +228,10 @@ def from_text_list( def from_text( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, ttl: int, - rdclass: Union[dns.rdataclass.RdataClass, str], - rdtype: Union[dns.rdatatype.RdataType, str], + rdclass: dns.rdataclass.RdataClass | str, + rdtype: dns.rdatatype.RdataType | str, *text_rdatas: Any, ) -> RRset: """Create an RRset with the specified name, TTL, class, and type and with @@ -246,10 +246,10 @@ def from_text( def from_rdata_list( - name: Union[dns.name.Name, str], + name: dns.name.Name | str, ttl: int, rdatas: Collection[dns.rdata.Rdata], - idna_codec: Optional[dns.name.IDNACodec] = None, + idna_codec: dns.name.IDNACodec | None = None, ) -> RRset: """Create an RRset with the specified name and TTL, and with the specified list of rdata objects. @@ -277,7 +277,7 @@ def from_rdata_list( return r -def from_rdata(name: Union[dns.name.Name, str], ttl: int, *rdatas: Any) -> RRset: +def from_rdata(name: dns.name.Name | str, ttl: int, *rdatas: Any) -> RRset: """Create an RRset with the specified name and TTL, and with the specified rdata objects. diff --git a/dns/tokenizer.py b/dns/tokenizer.py index c958e353..86ae3e2d 100644 --- a/dns/tokenizer.py +++ b/dns/tokenizer.py @@ -19,7 +19,7 @@ import io import sys -from typing import Any, List, Optional, Tuple +from typing import Any, List, Tuple import dns.exception import dns.name @@ -54,7 +54,7 @@ class Token: ttype: int, value: Any = "", has_escape: bool = False, - comment: Optional[str] = None, + comment: str | None = None, ): """Initialize a token instance.""" @@ -231,8 +231,8 @@ class Tokenizer: def __init__( self, f: Any = sys.stdin, - filename: Optional[str] = None, - idna_codec: Optional[dns.name.IDNACodec] = None, + filename: str | None = None, + idna_codec: dns.name.IDNACodec | None = None, ): """Initialize a tokenizer instance. @@ -263,8 +263,8 @@ class Tokenizer: else: filename = "" self.file = f - self.ungotten_char: Optional[str] = None - self.ungotten_token: Optional[Token] = None + self.ungotten_char: str | None = None + self.ungotten_token: Token | None = None self.multiline = 0 self.quoting = False self.eof = False @@ -566,7 +566,7 @@ class Tokenizer: ) return value - def get_string(self, max_length: Optional[int] = None) -> str: + def get_string(self, max_length: int | None = None) -> str: """Read the next token and interpret it as a string. Raises dns.exception.SyntaxError if not a string. @@ -596,7 +596,7 @@ class Tokenizer: raise dns.exception.SyntaxError("expecting an identifier") return token.value - def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]: + def get_remaining(self, max_tokens: int | None = None) -> List[Token]: """Return the remaining tokens on the line, until an EOL or EOF is seen. max_tokens: If not None, stop after this number of tokens. @@ -643,9 +643,9 @@ class Tokenizer: def as_name( self, token: Token, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = False, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> dns.name.Name: """Try to interpret the token as a DNS name. @@ -660,9 +660,9 @@ class Tokenizer: def get_name( self, - origin: Optional[dns.name.Name] = None, + origin: dns.name.Name | None = None, relativize: bool = False, - relativize_to: Optional[dns.name.Name] = None, + relativize_to: dns.name.Name | None = None, ) -> dns.name.Name: """Read the next token and interpret it as a DNS name. diff --git a/dns/transaction.py b/dns/transaction.py index bcdda9e0..9ecd7377 100644 --- a/dns/transaction.py +++ b/dns/transaction.py @@ -1,7 +1,7 @@ # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license import collections -from typing import Any, Callable, Iterator, List, Optional, Tuple, Union +from typing import Any, Callable, Iterator, List, Tuple import dns.exception import dns.name @@ -32,7 +32,7 @@ class TransactionManager: def origin_information( self, - ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]: + ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]: """Returns a tuple (absolute_origin, relativize, effective_origin) @@ -61,7 +61,7 @@ class TransactionManager: """The class of the transaction manager.""" raise NotImplementedError # pragma: no cover - def from_wire_origin(self) -> Optional[dns.name.Name]: + def from_wire_origin(self) -> dns.name.Name | None: """Origin to use in from_wire() calls.""" (absolute_origin, relativize, _) = self.origin_information() if relativize: @@ -128,9 +128,9 @@ class Transaction: def get( self, - name: Optional[Union[dns.name.Name, str]], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str | None, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, ) -> dns.rdataset.Rdataset: """Return the rdataset associated with *name*, *rdtype*, and *covers*, or `None` if not found. @@ -145,7 +145,7 @@ class Transaction: rdataset = self._get_rdataset(name, rdtype, covers) return _ensure_immutable_rdataset(rdataset) - def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]: + def get_node(self, name: dns.name.Name) -> dns.node.Node | None: """Return the node at *name*, if any. Returns an immutable node or ``None``. @@ -237,7 +237,7 @@ class Transaction: self._check_read_only() self._delete(True, args) - def name_exists(self, name: Union[dns.name.Name, str]) -> bool: + def name_exists(self, name: dns.name.Name | str) -> bool: """Does the specified name exist?""" self._check_ended() if isinstance(name, str): diff --git a/dns/tsig.py b/dns/tsig.py index 7070b0a0..333f9aa4 100644 --- a/dns/tsig.py +++ b/dns/tsig.py @@ -21,7 +21,6 @@ import base64 import hashlib import hmac import struct -from typing import Union import dns.exception import dns.name @@ -330,9 +329,9 @@ def get_context(key): class Key: def __init__( self, - name: Union[dns.name.Name, str], - secret: Union[bytes, str], - algorithm: Union[dns.name.Name, str] = default_algorithm, + name: dns.name.Name | str, + secret: bytes | str, + algorithm: dns.name.Name | str = default_algorithm, ): if isinstance(name, str): name = dns.name.from_text(name) diff --git a/dns/ttl.py b/dns/ttl.py index 06c11eef..16289cd0 100644 --- a/dns/ttl.py +++ b/dns/ttl.py @@ -17,8 +17,6 @@ """DNS TTL conversion.""" -from typing import Union - import dns.exception # Technically TTLs are supposed to be between 0 and 2**31 - 1, with values @@ -83,7 +81,7 @@ def from_text(text: str) -> int: return total -def make(value: Union[int, str]) -> int: +def make(value: int | str) -> int: if isinstance(value, int): return value elif isinstance(value, str): diff --git a/dns/update.py b/dns/update.py index cbf20797..0e4aee41 100644 --- a/dns/update.py +++ b/dns/update.py @@ -17,7 +17,7 @@ """DNS Dynamic Update Support""" -from typing import Any, List, Optional, Union +from typing import Any, List import dns.enum import dns.exception @@ -51,12 +51,12 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] def __init__( self, - zone: Optional[Union[dns.name.Name, str]] = None, + zone: dns.name.Name | str | None = None, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, - keyring: Optional[Any] = None, - keyname: Optional[dns.name.Name] = None, - keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, - id: Optional[int] = None, + keyring: Any | None = None, + keyname: dns.name.Name | None = None, + keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm, + id: int | None = None, ): """Initialize a new DNS Update object. @@ -170,7 +170,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, self.origin) self._add_rr(name, ttl, rd, section=section) - def add(self, name: Union[dns.name.Name, str], *args: Any) -> None: + def add(self, name: dns.name.Name | str, *args: Any) -> None: """Add records. The first argument is always a name. The other @@ -185,7 +185,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] self._add(False, self.update, name, *args) - def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None: + def delete(self, name: dns.name.Name | str, *args: Any) -> None: """Delete records. The first argument is always a name. The other @@ -245,7 +245,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] ) self._add_rr(name, 0, rd, dns.rdataclass.NONE) - def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None: + def replace(self, name: dns.name.Name | str, *args: Any) -> None: """Replace records. The first argument is always a name. The other @@ -263,7 +263,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] self._add(True, self.update, name, *args) - def present(self, name: Union[dns.name.Name, str], *args: Any) -> None: + def present(self, name: dns.name.Name | str, *args: Any) -> None: """Require that an owner name (and optionally an rdata type, or specific rdataset) exists as a prerequisite to the execution of the update. @@ -318,8 +318,8 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] def absent( self, - name: Union[dns.name.Name, str], - rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str | None = None, ) -> None: """Require that an owner name (and optionally an rdata type) does not exist as a prerequisite to the execution of the update.""" diff --git a/dns/versioned.py b/dns/versioned.py index 260eea1b..3644711e 100644 --- a/dns/versioned.py +++ b/dns/versioned.py @@ -4,10 +4,9 @@ import collections import threading -from typing import Callable, Deque, Optional, Set, Union, cast +from typing import Callable, Deque, Set, cast import dns.exception -import dns.immutable import dns.name import dns.node import dns.rdataclass @@ -45,10 +44,10 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] def __init__( self, - origin: Optional[Union[dns.name.Name, str]], + origin: dns.name.Name | str | None, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, relativize: bool = True, - pruning_policy: Optional[Callable[["Zone", Version], Optional[bool]]] = None, + pruning_policy: Callable[["Zone", Version], bool | None] | None = None, ): """Initialize a versioned zone object. @@ -72,8 +71,8 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] self._pruning_policy = self._default_pruning_policy else: self._pruning_policy = pruning_policy - self._write_txn: Optional[Transaction] = None - self._write_event: Optional[threading.Event] = None + self._write_txn: Transaction | None = None + self._write_event: threading.Event | None = None self._write_waiters: Deque[threading.Event] = collections.deque() self._readers: Set[Transaction] = set() self._commit_version_unlocked( @@ -81,7 +80,7 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] ) def reader( - self, id: Optional[int] = None, serial: Optional[int] = None + self, id: int | None = None, serial: int | None = None ) -> Transaction: # pylint: disable=arguments-differ if id is not None and serial is not None: raise ValueError("cannot specify both id and serial") @@ -197,7 +196,7 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] ): self._versions.popleft() - def set_max_versions(self, max_versions: Optional[int]) -> None: + def set_max_versions(self, max_versions: int | None) -> None: """Set a pruning policy that retains up to the specified number of versions """ @@ -216,7 +215,7 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] self.set_pruning_policy(policy) def set_pruning_policy( - self, policy: Optional[Callable[["Zone", Version], Optional[bool]]] + self, policy: Callable[["Zone", Version], bool | None] | None ) -> None: """Set the pruning policy for the zone. @@ -271,20 +270,20 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] return id def find_node( - self, name: Union[dns.name.Name, str], create: bool = False + self, name: dns.name.Name | str, create: bool = False ) -> dns.node.Node: if create: raise UseTransaction return super().find_node(name) - def delete_node(self, name: Union[dns.name.Name, str]) -> None: + def delete_node(self, name: dns.name.Name | str) -> None: raise UseTransaction def find_rdataset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, create: bool = False, ) -> dns.rdataset.Rdataset: if create: @@ -294,11 +293,11 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] def get_rdataset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, create: bool = False, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: if create: raise UseTransaction rdataset = super().get_rdataset(name, rdtype, covers) @@ -309,13 +308,13 @@ class Zone(dns.zone.Zone): # lgtm[py/missing-equals] def delete_rdataset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, ) -> None: raise UseTransaction def replace_rdataset( - self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset + self, name: dns.name.Name | str, replacement: dns.rdataset.Rdataset ) -> None: raise UseTransaction diff --git a/dns/xfr.py b/dns/xfr.py index d17dd484..219fdc8c 100644 --- a/dns/xfr.py +++ b/dns/xfr.py @@ -15,7 +15,7 @@ # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -from typing import Any, List, Optional, Tuple, Union, cast +from typing import Any, List, Tuple, cast import dns.edns import dns.exception @@ -62,7 +62,7 @@ class Inbound: self, txn_manager: dns.transaction.TransactionManager, rdtype: dns.rdatatype.RdataType = dns.rdatatype.AXFR, - serial: Optional[int] = None, + serial: int | None = None, is_udp: bool = False, ): """Initialize an inbound zone transfer. @@ -78,7 +78,7 @@ class Inbound: XFR. """ self.txn_manager = txn_manager - self.txn: Optional[dns.transaction.Transaction] = None + self.txn: dns.transaction.Transaction | None = None self.rdtype = rdtype if rdtype == dns.rdatatype.IXFR: if serial is None: @@ -93,7 +93,7 @@ class Inbound: self.serial = serial self.is_udp = is_udp (_, _, self.origin) = txn_manager.origin_information() - self.soa_rdataset: Optional[dns.rdataset.Rdataset] = None + self.soa_rdataset: dns.rdataset.Rdataset | None = None self.done = False self.expecting_SOA = False self.delete_mode = False @@ -260,16 +260,16 @@ class Inbound: def make_query( txn_manager: dns.transaction.TransactionManager, - serial: Optional[int] = 0, - use_edns: Optional[Union[int, bool]] = None, - ednsflags: Optional[int] = None, - payload: Optional[int] = None, - request_payload: Optional[int] = None, - options: Optional[List[dns.edns.Option]] = None, + serial: int | None = 0, + use_edns: int | bool | None = None, + ednsflags: int | None = None, + payload: int | None = None, + request_payload: int | None = None, + options: List[dns.edns.Option] | None = None, keyring: Any = None, - keyname: Optional[dns.name.Name] = None, - keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, -) -> Tuple[dns.message.QueryMessage, Optional[int]]: + keyname: dns.name.Name | None = None, + keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm, +) -> Tuple[dns.message.QueryMessage, int | None]: """Make an AXFR or IXFR query. *txn_manager* is a ``dns.transaction.TransactionManager``, typically a @@ -333,7 +333,7 @@ def make_query( return (q, serial) -def extract_serial_from_query(query: dns.message.Message) -> Optional[int]: +def extract_serial_from_query(query: dns.message.Message) -> int | None: """Extract the SOA serial number from query if it is an IXFR and return it, otherwise return None. diff --git a/dns/zone.py b/dns/zone.py index 05170fe8..f916ffee 100644 --- a/dns/zone.py +++ b/dns/zone.py @@ -28,10 +28,8 @@ from typing import ( Iterator, List, MutableMapping, - Optional, Set, Tuple, - Union, cast, ) @@ -88,7 +86,7 @@ class DigestVerificationFailure(dns.exception.DNSException): def _validate_name( name: dns.name.Name, - origin: Optional[dns.name.Name], + origin: dns.name.Name | None, relativize: bool, ) -> dns.name.Name: # This name validation code is shared by Zone and Version @@ -133,14 +131,14 @@ class Zone(dns.transaction.TransactionManager): map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = dict # We only require the version types as "Version" to allow for flexibility, as # only the version protocol matters - writable_version_factory: Optional[Callable[["Zone", bool], "Version"]] = None - immutable_version_factory: Optional[Callable[["Version"], "Version"]] = None + writable_version_factory: Callable[["Zone", bool], "Version"] | None = None + immutable_version_factory: Callable[["Version"], "Version"] | None = None __slots__ = ["rdclass", "origin", "nodes", "relativize"] def __init__( self, - origin: Optional[Union[dns.name.Name, str]], + origin: dns.name.Name | str | None, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, relativize: bool = True, ): @@ -193,7 +191,7 @@ class Zone(dns.transaction.TransactionManager): return not self.__eq__(other) - def _validate_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: + def _validate_name(self, name: dns.name.Name | str) -> dns.name.Name: # Note that any changes in this method should have corresponding changes # made in the Version _validate_name() method. if isinstance(name, str): @@ -235,7 +233,7 @@ class Zone(dns.transaction.TransactionManager): return key in self.nodes def find_node( - self, name: Union[dns.name.Name, str], create: bool = False + self, name: dns.name.Name | str, create: bool = False ) -> dns.node.Node: """Find a node in the zone, possibly creating it. @@ -263,8 +261,8 @@ class Zone(dns.transaction.TransactionManager): return node def get_node( - self, name: Union[dns.name.Name, str], create: bool = False - ) -> Optional[dns.node.Node]: + self, name: dns.name.Name | str, create: bool = False + ) -> dns.node.Node | None: """Get a node in the zone, possibly creating it. This method is like ``find_node()``, except it returns None instead @@ -288,7 +286,7 @@ class Zone(dns.transaction.TransactionManager): node = None return node - def delete_node(self, name: Union[dns.name.Name, str]) -> None: + def delete_node(self, name: dns.name.Name | str) -> None: """Delete the specified node if it exists. *name*: the name of the node to find. @@ -305,9 +303,9 @@ class Zone(dns.transaction.TransactionManager): def find_rdataset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, create: bool = False, ) -> dns.rdataset.Rdataset: """Look for an rdataset with the specified name and type in the zone, @@ -352,11 +350,11 @@ class Zone(dns.transaction.TransactionManager): def get_rdataset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, create: bool = False, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: """Look for an rdataset with the specified name and type in the zone. This method is like ``find_rdataset()``, except it returns None instead @@ -400,9 +398,9 @@ class Zone(dns.transaction.TransactionManager): def delete_rdataset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, ) -> None: """Delete the rdataset matching *rdtype* and *covers*, if it exists at the node specified by *name*. @@ -437,7 +435,7 @@ class Zone(dns.transaction.TransactionManager): self.delete_node(name) def replace_rdataset( - self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset + self, name: dns.name.Name | str, replacement: dns.rdataset.Rdataset ) -> None: """Replace an rdataset at name. @@ -464,9 +462,9 @@ class Zone(dns.transaction.TransactionManager): def find_rrset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, ) -> dns.rrset.RRset: """Look for an rdataset with the specified name and type in the zone, and return an RRset encapsulating it. @@ -516,10 +514,10 @@ class Zone(dns.transaction.TransactionManager): def get_rrset( self, - name: Union[dns.name.Name, str], - rdtype: Union[dns.rdatatype.RdataType, str], - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, - ) -> Optional[dns.rrset.RRset]: + name: dns.name.Name | str, + rdtype: dns.rdatatype.RdataType | str, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, + ) -> dns.rrset.RRset | None: """Look for an rdataset with the specified name and type in the zone, and return an RRset encapsulating it. @@ -562,8 +560,8 @@ class Zone(dns.transaction.TransactionManager): def iterate_rdatasets( self, - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY, - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, ) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]: """Return a generator which yields (name, rdataset) tuples for all rdatasets in the zone which have the specified *rdtype* @@ -594,8 +592,8 @@ class Zone(dns.transaction.TransactionManager): def iterate_rdatas( self, - rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY, - covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE, + rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY, + covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE, ) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]: """Return a generator which yields (name, ttl, rdata) tuples for all rdatas in the zone which have the specified *rdtype* @@ -630,7 +628,7 @@ class Zone(dns.transaction.TransactionManager): f: Any, sorted: bool = True, relativize: bool = True, - nl: Optional[str] = None, + nl: str | None = None, want_comments: bool = False, want_origin: bool = False, ) -> None: @@ -718,7 +716,7 @@ class Zone(dns.transaction.TransactionManager): self, sorted: bool = True, relativize: bool = True, - nl: Optional[str] = None, + nl: str | None = None, want_comments: bool = False, want_origin: bool = False, ) -> str: @@ -773,7 +771,7 @@ class Zone(dns.transaction.TransactionManager): raise NoNS def get_soa( - self, txn: Optional[dns.transaction.Transaction] = None + self, txn: dns.transaction.Transaction | None = None ) -> dns.rdtypes.ANY.SOA.SOA: """Get the zone SOA rdata. @@ -789,7 +787,7 @@ class Zone(dns.transaction.TransactionManager): # an SOA if there is no origin. raise NoSOA origin_name = self.origin - soa_rds: Optional[dns.rdataset.Rdataset] + soa_rds: dns.rdataset.Rdataset | None if txn: soa_rds = txn.get(origin_name, dns.rdatatype.SOA) else: @@ -846,9 +844,9 @@ class Zone(dns.transaction.TransactionManager): ) def verify_digest( - self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD] = None + self, zonemd: dns.rdtypes.ANY.ZONEMD.ZONEMD | None = None ) -> None: - digests: Union[dns.rdataset.Rdataset, List[dns.rdtypes.ANY.ZONEMD.ZONEMD]] + digests: dns.rdataset.Rdataset | List[dns.rdtypes.ANY.ZONEMD.ZONEMD] if zonemd: digests = [zonemd] else: @@ -878,8 +876,8 @@ class Zone(dns.transaction.TransactionManager): def origin_information( self, - ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]: - effective: Optional[dns.name.Name] + ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]: + effective: dns.name.Name | None if self.relativize: effective = dns.name.empty else: @@ -950,7 +948,7 @@ class ImmutableVersionedNode(VersionedNode): rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, create: bool = False, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: if create: raise TypeError("immutable") return super().get_rdataset(rdclass, rdtype, covers, False) @@ -975,8 +973,8 @@ class Version: self, zone: Zone, id: int, - nodes: Optional[MutableMapping[dns.name.Name, dns.node.Node]] = None, - origin: Optional[dns.name.Name] = None, + nodes: MutableMapping[dns.name.Name, dns.node.Node] | None = None, + origin: dns.name.Name | None = None, ): self.zone = zone self.id = id @@ -989,7 +987,7 @@ class Version: def _validate_name(self, name: dns.name.Name) -> dns.name.Name: return _validate_name(name, self.origin, self.zone.relativize) - def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]: + def get_node(self, name: dns.name.Name) -> dns.node.Node | None: name = self._validate_name(name) return self.nodes.get(name) @@ -998,7 +996,7 @@ class Version: name: dns.name.Name, rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType, - ) -> Optional[dns.rdataset.Rdataset]: + ) -> dns.rdataset.Rdataset | None: node = self.get_node(name) if node is None: return None @@ -1211,15 +1209,15 @@ class Transaction(dns.transaction.Transaction): def _from_text( text: Any, - origin: Optional[Union[dns.name.Name, str]] = None, + origin: dns.name.Name | str | None = None, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, relativize: bool = True, zone_factory: Any = Zone, - filename: Optional[str] = None, + filename: str | None = None, allow_include: bool = False, check_origin: bool = True, - idna_codec: Optional[dns.name.IDNACodec] = None, - allow_directives: Union[bool, Iterable[str]] = True, + idna_codec: dns.name.IDNACodec | None = None, + allow_directives: bool | Iterable[str] = True, ) -> Zone: # See the comments for the public APIs from_text() and from_file() for # details. @@ -1253,15 +1251,15 @@ def _from_text( def from_text( text: str, - origin: Optional[Union[dns.name.Name, str]] = None, + origin: dns.name.Name | str | None = None, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, relativize: bool = True, zone_factory: Any = Zone, - filename: Optional[str] = None, + filename: str | None = None, allow_include: bool = False, check_origin: bool = True, - idna_codec: Optional[dns.name.IDNACodec] = None, - allow_directives: Union[bool, Iterable[str]] = True, + idna_codec: dns.name.IDNACodec | None = None, + allow_directives: bool | Iterable[str] = True, ) -> Zone: """Build a zone object from a zone file format string. @@ -1327,15 +1325,15 @@ def from_text( def from_file( f: Any, - origin: Optional[Union[dns.name.Name, str]] = None, + origin: dns.name.Name | str | None = None, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, relativize: bool = True, zone_factory: Any = Zone, - filename: Optional[str] = None, + filename: str | None = None, allow_include: bool = True, check_origin: bool = True, - idna_codec: Optional[dns.name.IDNACodec] = None, - allow_directives: Union[bool, Iterable[str]] = True, + idna_codec: dns.name.IDNACodec | None = None, + allow_directives: bool | Iterable[str] = True, ) -> Zone: """Read a zone file and build a zone object. diff --git a/dns/zonefile.py b/dns/zonefile.py index f7bcb12c..7a81454b 100644 --- a/dns/zonefile.py +++ b/dns/zonefile.py @@ -19,7 +19,7 @@ import re import sys -from typing import Any, Iterable, List, Optional, Set, Tuple, Union, cast +from typing import Any, Iterable, List, Set, Tuple, cast import dns.exception import dns.grange @@ -68,9 +68,9 @@ def _check_cname_and_other_data(txn, name, rdataset): SavedStateType = Tuple[ dns.tokenizer.Tokenizer, - Optional[dns.name.Name], # current_origin - Optional[dns.name.Name], # last_name - Optional[Any], # current_file + dns.name.Name | None, # current_origin + dns.name.Name | None, # last_name + Any | None, # current_file int, # last_ttl bool, # last_ttl_known int, # default_ttl @@ -94,12 +94,12 @@ class Reader: rdclass: dns.rdataclass.RdataClass, txn: dns.transaction.Transaction, allow_include: bool = False, - allow_directives: Union[bool, Iterable[str]] = True, - force_name: Optional[dns.name.Name] = None, - force_ttl: Optional[int] = None, - force_rdclass: Optional[dns.rdataclass.RdataClass] = None, - force_rdtype: Optional[dns.rdatatype.RdataType] = None, - default_ttl: Optional[int] = None, + allow_directives: bool | Iterable[str] = True, + force_name: dns.name.Name | None = None, + force_ttl: int | None = None, + force_rdclass: dns.rdataclass.RdataClass | None = None, + force_rdtype: dns.rdatatype.RdataType | None = None, + default_ttl: int | None = None, ): self.tok = tok (self.zone_origin, self.relativize, _) = txn.manager.origin_information() @@ -118,7 +118,7 @@ class Reader: self.zone_rdclass = rdclass self.txn = txn self.saved_state: List[SavedStateType] = [] - self.current_file: Optional[Any] = None + self.current_file: Any | None = None self.allowed_directives: Set[str] if allow_directives is True: self.allowed_directives = {"$GENERATE", "$ORIGIN", "$TTL"} @@ -515,7 +515,7 @@ class Reader: token = self.tok.get() filename = token.value token = self.tok.get() - new_origin: Optional[dns.name.Name] + new_origin: dns.name.Name | None if token.is_identifier(): new_origin = dns.name.from_text( token.value, self.current_origin, self.tok.idna_codec @@ -630,7 +630,7 @@ class RRsetsReaderTransaction(dns.transaction.Transaction): class RRSetsReaderManager(dns.transaction.TransactionManager): def __init__( self, - origin: Optional[dns.name.Name] = dns.name.root, + origin: dns.name.Name | None = dns.name.root, relativize: bool = False, rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, ): @@ -662,14 +662,14 @@ class RRSetsReaderManager(dns.transaction.TransactionManager): def read_rrsets( text: Any, - name: Optional[Union[dns.name.Name, str]] = None, - ttl: Optional[int] = None, - rdclass: Optional[Union[dns.rdataclass.RdataClass, str]] = dns.rdataclass.IN, - default_rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, - rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None, - default_ttl: Optional[Union[int, str]] = None, - idna_codec: Optional[dns.name.IDNACodec] = None, - origin: Optional[Union[dns.name.Name, str]] = dns.name.root, + name: dns.name.Name | str | None = None, + ttl: int | None = None, + rdclass: dns.rdataclass.RdataClass | str | None = dns.rdataclass.IN, + default_rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN, + rdtype: dns.rdatatype.RdataType | str | None = None, + default_ttl: int | str | None = None, + idna_codec: dns.name.IDNACodec | None = None, + origin: dns.name.Name | str | None = dns.name.root, relativize: bool = False, ) -> List[dns.rrset.RRset]: """Read one or more rrsets from the specified text, possibly subject diff --git a/pyproject.toml b/pyproject.toml index 472fe04e..afaef597 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,16 +105,11 @@ lint.select = [ lint.ignore = [ "E501", "E741", - "F401", "I001", "B904", "B011", "UP006", "UP035", - # these three are about post-3.9 type syntax and should be revisited after 2.8 - "UP045", - "UP007", - "UP038", ] lint.exclude = ["tests/*"]