# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import os
-from typing import Optional, Tuple, Union
+from typing import Tuple
def convert_verify_to_cafile_and_capath(
- verify: Union[bool, str],
-) -> Tuple[Optional[str], Optional[str]]:
- cafile: Optional[str] = None
- capath: Optional[str] = None
+ verify: bool | str,
+) -> Tuple[str | None, str | None]:
+ cafile: str | None = None
+ capath: str | None = None
if isinstance(verify, str):
if os.path.isfile(verify):
cafile = verify
import struct
import time
import urllib.parse
-from typing import Any, Dict, Optional, Tuple, Union, cast
+from typing import Any, Dict, Optional, Tuple, cast
import dns.asyncbackend
import dns.exception
async def send_udp(
sock: dns.asyncbackend.DatagramSocket,
- what: Union[dns.message.Message, bytes],
+ what: dns.message.Message | bytes,
destination: Any,
- expiration: Optional[float] = None,
+ expiration: float | None = None,
) -> Tuple[int, float]:
"""Send a DNS message to the specified UDP socket.
async def receive_udp(
sock: dns.asyncbackend.DatagramSocket,
- destination: Optional[Any] = None,
- expiration: Optional[float] = None,
+ destination: Any | None = None,
+ expiration: float | None = None,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None,
- request_mac: Optional[bytes] = b"",
+ keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ request_mac: bytes | None = b"",
ignore_trailing: bool = False,
raise_on_truncation: bool = False,
ignore_errors: bool = False,
- query: Optional[dns.message.Message] = None,
+ query: dns.message.Message | None = None,
) -> Any:
"""Read a DNS message from a UDP socket.
async def udp(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 53,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
raise_on_truncation: bool = False,
- sock: Optional[dns.asyncbackend.DatagramSocket] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ sock: dns.asyncbackend.DatagramSocket | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
ignore_errors: bool = False,
) -> dns.message.Message:
"""Return the response obtained after sending a query via UDP.
async def udp_with_fallback(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 53,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- udp_sock: Optional[dns.asyncbackend.DatagramSocket] = None,
- tcp_sock: Optional[dns.asyncbackend.StreamSocket] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ udp_sock: dns.asyncbackend.DatagramSocket | None = None,
+ tcp_sock: dns.asyncbackend.StreamSocket | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
ignore_errors: bool = False,
) -> Tuple[dns.message.Message, bool]:
"""Return the response to the query, trying UDP first and falling back
async def send_tcp(
sock: dns.asyncbackend.StreamSocket,
- what: Union[dns.message.Message, bytes],
- expiration: Optional[float] = None,
+ what: dns.message.Message | bytes,
+ expiration: float | None = None,
) -> Tuple[int, float]:
"""Send a DNS message to the specified TCP socket.
async def receive_tcp(
sock: dns.asyncbackend.StreamSocket,
- expiration: Optional[float] = None,
+ expiration: float | None = None,
one_rr_per_rrset: bool = False,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None,
- request_mac: Optional[bytes] = b"",
+ keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ request_mac: bytes | None = b"",
ignore_trailing: bool = False,
) -> Tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
async def tcp(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 53,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- sock: Optional[dns.asyncbackend.StreamSocket] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ sock: dns.asyncbackend.StreamSocket | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
) -> dns.message.Message:
"""Return the response obtained after sending a query via TCP.
async def tls(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 853,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- sock: Optional[dns.asyncbackend.StreamSocket] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
- ssl_context: Optional[ssl.SSLContext] = None,
- server_hostname: Optional[str] = None,
- verify: Union[bool, str] = True,
+ sock: dns.asyncbackend.StreamSocket | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
+ ssl_context: ssl.SSLContext | None = None,
+ server_hostname: str | None = None,
+ verify: bool | str = True,
) -> dns.message.Message:
"""Return the response obtained after sending a query via TLS.
async def https(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 443,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0, # pylint: disable=W0613
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
client: Optional["httpx.AsyncClient|dns.quic.AsyncQuicConnection"] = None,
path: str = "/dns-query",
post: bool = True,
- verify: Union[bool, str, ssl.SSLContext] = True,
- bootstrap_address: Optional[str] = None,
+ verify: bool | str | ssl.SSLContext = True,
+ bootstrap_address: str | None = None,
resolver: Optional["dns.asyncresolver.Resolver"] = None, # pyright: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
q: dns.message.Message,
where: str,
url: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 443,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- verify: Union[bool, str, ssl.SSLContext] = True,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ verify: bool | str | ssl.SSLContext = True,
+ backend: dns.asyncbackend.Backend | None = None,
post: bool = True,
- connection: Optional[dns.quic.AsyncQuicConnection] = None,
+ connection: dns.quic.AsyncQuicConnection | None = None,
) -> dns.message.Message:
if not dns.quic.have_quic:
raise NoDOH("DNS-over-HTTP3 is not available.") # pragma: no cover
async def quic(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 853,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- connection: Optional[dns.quic.AsyncQuicConnection] = None,
- verify: Union[bool, str] = True,
- backend: Optional[dns.asyncbackend.Backend] = None,
- hostname: Optional[str] = None,
- server_hostname: Optional[str] = None,
+ connection: dns.quic.AsyncQuicConnection | None = None,
+ verify: bool | str = True,
+ backend: dns.asyncbackend.Backend | None = None,
+ hostname: str | None = None,
+ server_hostname: str | None = None,
) -> dns.message.Message:
"""Return the response obtained after sending an asynchronous query via
DNS-over-QUIC.
txn_manager: dns.transaction.TransactionManager,
s: dns.asyncbackend.Socket,
query: dns.message.Message,
- serial: Optional[int],
- timeout: Optional[float],
+ serial: int | None,
+ timeout: float | None,
expiration: float,
) -> Any:
"""Given a socket, does the zone transfer."""
with dns.xfr.Inbound(txn_manager, rdtype, serial, is_udp) as inbound:
done = False
tsig_ctx = None
- r: Optional[dns.message.Message] = None
+ r: dns.message.Message | None = None
while not done:
(_, mexpiration) = _compute_times(timeout)
if mexpiration is None or (
async def inbound_xfr(
where: str,
txn_manager: dns.transaction.TransactionManager,
- query: Optional[dns.message.Message] = None,
+ query: dns.message.Message | None = None,
port: int = 53,
- timeout: Optional[float] = None,
- lifetime: Optional[float] = None,
- source: Optional[str] = None,
+ timeout: float | None = None,
+ lifetime: float | None = None,
+ source: str | None = None,
source_port: int = 0,
udp_mode: UDPMode = UDPMode.NEVER,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ backend: dns.asyncbackend.Backend | None = None,
) -> None:
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
import socket
import time
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List
import dns._ddr
import dns.asyncbackend
async def resolve(
self,
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ lifetime: float | None = None,
+ search: bool | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
) -> dns.resolver.Answer:
"""Query nameservers asynchronously to find the answer to the question.
async def resolve_name(
self,
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
family: int = socket.AF_UNSPEC,
**kwargs: Any,
) -> dns.resolver.HostAnswers:
# pylint: disable=redefined-outer-name
- async def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
+ async def canonical_name(self, name: dns.name.Name | str) -> dns.name.Name:
"""Determine the canonical name of *name*.
The canonical name is the name the resolver uses for queries
async def resolve(
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ lifetime: float | None = None,
+ search: bool | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
) -> dns.resolver.Answer:
"""Query nameservers asynchronously to find the answer to the question.
async def resolve_name(
- name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any
+ name: dns.name.Name | str, family: int = socket.AF_UNSPEC, **kwargs: Any
) -> dns.resolver.HostAnswers:
"""Use a resolver to asynchronously query for address records.
return await get_default_resolver().resolve_name(name, family, **kwargs)
-async def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name:
+async def canonical_name(name: dns.name.Name | str) -> dns.name.Name:
"""Determine the canonical name of *name*.
See :py:func:`dns.resolver.Resolver.canonical_name` for more
async def zone_for_name(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
tcp: bool = False,
- resolver: Optional[Resolver] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ resolver: Resolver | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
) -> dns.name.Name:
"""Find the name of the zone which contains the specified name.
async def make_resolver_at(
- where: Union[dns.name.Name, str],
+ where: dns.name.Name | str,
port: int = 53,
family: int = socket.AF_UNSPEC,
- resolver: Optional[Resolver] = None,
+ resolver: Resolver | None = None,
) -> Resolver:
"""Make a stub resolver using the specified destination as the full resolver.
"""
if resolver is None:
resolver = get_default_resolver()
- nameservers: List[Union[str, dns.nameserver.Nameserver]] = []
+ nameservers: List[str | dns.nameserver.Nameserver] = []
if isinstance(where, str) and dns.inet.is_address(where):
nameservers.append(dns.nameserver.Do53Nameserver(where, port))
else:
async def resolve_at(
- where: Union[dns.name.Name, str],
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ where: dns.name.Name | str,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
+ lifetime: float | None = None,
+ search: bool | None = None,
+ backend: dns.asyncbackend.Backend | None = None,
port: int = 53,
family: int = socket.AF_UNSPEC,
- resolver: Optional[Resolver] = None,
+ resolver: Resolver | None = None,
) -> dns.resolver.Answer:
"""Query nameservers to find the answer to the question.
child = self.maybe_cow_child(i)
return child._get_node(key)
- def get(self, key: KT) -> Optional[ET]:
+ def get(self, key: KT) -> ET | None:
"""Get the element associated with *key* or return ``None``"""
i, equal = self.search_in_node(key)
if equal:
if not left.try_right_steal(self, index - 1):
break
- def insert_nonfull(self, element: ET, in_order: bool) -> Optional[ET]:
+ def insert_nonfull(self, element: ET, in_order: bool) -> ET | None:
assert not self.is_maximal()
while True:
key = element.key()
left.merge(parent, index - 1)
def delete(
- self, key: KT, parent: Optional["_Node[KT, ET]"], exact: Optional[ET]
- ) -> Optional[ET]:
+ self, key: KT, parent: Optional["_Node[KT, ET]"], exact: ET | None
+ ) -> ET | None:
"""Delete an element matching *key* if it exists. If *exact* is not ``None``
then it must be an exact match with that element. The Node must not be
minimal unless it is the root."""
def __init__(self, btree: "BTree[KT, ET]"):
self.btree = btree
- self.current_node: Optional[_Node] = None
+ self.current_node: _Node | None = None
# The current index is the element index within the current node, or
# if there is no current node then it is 0 on the left boundary and 1
# on the right boundary.
self.increasing = True
self.parents: list[tuple[_Node, int]] = []
self.parked = False
- self.parking_key: Optional[KT] = None
+ self.parking_key: KT | None = None
self.parking_key_read = False
def _seek_least(self) -> None:
self.parked = False
self.parking_key = None
- def prev(self) -> Optional[ET]:
+ def prev(self) -> ET | None:
"""Get the previous element, or return None if on the left boundary."""
self._maybe_unpark()
self.parking_key = None
self.current_index = 0
return None
- def next(self) -> Optional[ET]:
+ def next(self) -> ET | None:
"""Get the next element, or return None if on the right boundary."""
self._maybe_unpark()
self.parking_key = None
# delete_key() so that BTreeDict can be a proper MutableMapping and supply the
# rest of the standard mapping API.
- def insert_element(self, elt: ET, in_order: bool = False) -> Optional[ET]:
+ def insert_element(self, elt: ET, in_order: bool = False) -> ET | None:
"""Insert the element into the BTree.
If *in_order* is ``True``, then extra work will be done to make left siblings
self.size += 1
return oelt
- def get_element(self, key: KT) -> Optional[ET]:
+ def get_element(self, key: KT) -> ET | None:
"""Get the element matching *key* from the BTree, or return ``None`` if it
does not exist.
"""
return self.root.get(key)
- def _delete(self, key: KT, exact: Optional[ET]) -> Optional[ET]:
+ def _delete(self, key: KT, exact: ET | None) -> ET | None:
self._check_mutable_and_park()
cloned = self.root.maybe_cow(self.creator)
if cloned:
self.root = self.root.children[0]
return elt
- def delete_key(self, key: KT) -> Optional[ET]:
+ def delete_key(self, key: KT) -> ET | None:
"""Delete the element matching *key* from the BTree.
Returns the matching element or ``None`` if it does not exist.
"""
return self._delete(key, None)
- def delete_exact(self, element: ET) -> Optional[ET]:
+ def delete_exact(self, element: ET) -> ET | None:
"""Delete *element* from the BTree.
Returns the matching element or ``None`` if it was not in the BTree.
self,
*,
t: int = DEFAULT_T,
- original: Optional[BTree] = None,
+ original: BTree | None = None,
in_order: bool = False,
):
super().__init__(t=t, original=original)
self,
*,
t: int = DEFAULT_T,
- original: Optional[BTree] = None,
+ original: BTree | None = None,
in_order: bool = False,
):
super().__init__(t=t, original=original)
import enum
from dataclasses import dataclass
-from typing import Callable, MutableMapping, Optional, Tuple, Union, cast
+from typing import Callable, MutableMapping, Tuple, cast
import dns.btree
import dns.immutable
class Node(dns.node.Node):
__slots__ = ["flags", "id"]
- def __init__(self, flags: Optional[NodeFlags] = None):
+ def __init__(self, flags: NodeFlags | None = None):
super().__init__()
if flags is None:
# We allow optional flags rather than a default
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
class Delegations(dns.btree.BTreeSet[dns.name.Name]):
- def get_delegation(
- self, name: dns.name.Name
- ) -> Tuple[Optional[dns.name.Name], bool]:
+ def get_delegation(self, name: dns.name.Name) -> Tuple[dns.name.Name | None, bool]:
"""Get the delegation applicable to *name*, if it exists.
If there delegation, then return a tuple consisting of the name of
class Bounds:
name: dns.name.Name
left: dns.name.Name
- right: Optional[dns.name.Name]
+ right: dns.name.Name | None
closest_encloser: dns.name.Name
is_equal: bool
is_delegation: bool
self.delegations = version.delegations
self.delegations.make_immutable()
- def bounds(self, name: Union[dns.name.Name, str]) -> Bounds:
+ def bounds(self, name: dns.name.Name | str) -> Bounds:
"""Return the 'bounds' of *name* in its zone.
The bounds information is useful when making an authoritative response, as
Callable[[], MutableMapping[dns.name.Name, dns.node.Node]],
dns.btree.BTreeDict[dns.name.Name, Node],
)
- writable_version_factory: Optional[
- Callable[[dns.zone.Zone, bool], dns.zone.Version]
- ] = WritableVersion
- immutable_version_factory: Optional[
- Callable[[dns.zone.Version], dns.zone.Version]
- ] = ImmutableVersion
+ writable_version_factory: (
+ Callable[[dns.zone.Zone, bool], dns.zone.Version] | None
+ ) = WritableVersion
+ immutable_version_factory: Callable[[dns.zone.Version], dns.zone.Version] | None = (
+ ImmutableVersion
+ )
import struct
import time
from datetime import datetime
-from typing import Callable, Dict, List, Optional, Set, Tuple, Union, cast
+from typing import Callable, Dict, List, Set, Tuple, Union, cast
import dns._features
-import dns.exception
import dns.name
import dns.node
import dns.rdata
import dns.transaction
import dns.zone
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
-from dns.exception import ( # pylint: disable=W0611
- AlgorithmKeyMismatch,
- DeniedByPolicy,
- UnsupportedAlgorithm,
- ValidationFailure,
-)
+from dns.exception import AlgorithmKeyMismatch as AlgorithmKeyMismatch
+from dns.exception import DeniedByPolicy, UnsupportedAlgorithm, ValidationFailure
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
from dns.rdtypes.ANY.CDS import CDS
from dns.rdtypes.ANY.DNSKEY import DNSKEY
return Algorithm.from_text(text)
-def algorithm_to_text(value: Union[Algorithm, int]) -> str:
+def algorithm_to_text(value: Algorithm | int) -> str:
"""Convert a DNSSEC algorithm value to text
*value*, a ``dns.dnssec.Algorithm``.
return Algorithm.to_text(value)
-def to_timestamp(value: Union[datetime, str, float, int]) -> int:
+def to_timestamp(value: datetime | str | float | int) -> int:
"""Convert various format to a timestamp"""
if isinstance(value, datetime):
return int(value.timestamp())
raise TypeError("Unsupported timestamp type")
-def key_id(key: Union[DNSKEY, CDNSKEY]) -> int:
+def key_id(key: DNSKEY | CDNSKEY) -> int:
"""Return the key id (a 16-bit number) for the specified key.
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``
def make_ds(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
key: dns.rdata.Rdata,
- algorithm: Union[DSDigest, str],
- origin: Optional[dns.name.Name] = None,
- policy: Optional[Policy] = None,
+ algorithm: DSDigest | str,
+ origin: dns.name.Name | None = None,
+ policy: Policy | None = None,
validating: bool = False,
) -> DS:
"""Create a DS record for a DNSSEC key.
check = policy.ok_to_create_ds
if not check(algorithm):
raise DeniedByPolicy
- if not isinstance(key, (DNSKEY, CDNSKEY)):
- raise ValueError("key is not a DNSKEY/CDNSKEY")
+ if not isinstance(key, DNSKEY | CDNSKEY):
+ raise ValueError("key is not a DNSKEY | CDNSKEY")
if algorithm == DSDigest.SHA1:
dshash = hashlib.sha1()
elif algorithm == DSDigest.SHA256:
def make_cds(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
key: dns.rdata.Rdata,
- algorithm: Union[DSDigest, str],
- origin: Optional[dns.name.Name] = None,
+ algorithm: DSDigest | str,
+ origin: dns.name.Name | None = None,
) -> CDS:
"""Create a CDS record for a DNSSEC key.
def _find_candidate_keys(
- keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG
-) -> Optional[List[DNSKEY]]:
+ keys: Dict[dns.name.Name, dns.rdataset.Rdataset | dns.node.Node], rrsig: RRSIG
+) -> List[DNSKEY] | None:
value = keys.get(rrsig.signer)
if isinstance(value, dns.node.Node):
rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY)
def _get_rrname_rdataset(
- rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
if isinstance(rrset, tuple):
return rrset[0], rrset[1]
def _validate_rrsig(
- rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
rrsig: RRSIG,
- keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
- origin: Optional[dns.name.Name] = None,
- now: Optional[float] = None,
- policy: Optional[Policy] = None,
+ keys: Dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset],
+ origin: dns.name.Name | None = None,
+ now: float | None = None,
+ policy: Policy | None = None,
) -> None:
"""Validate an RRset against a single signature rdata, throwing an
exception if validation is not successful.
def _validate(
- rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
- rrsigset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
- keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
- origin: Optional[dns.name.Name] = None,
- now: Optional[float] = None,
- policy: Optional[Policy] = None,
+ rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
+ rrsigset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
+ keys: Dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset],
+ origin: dns.name.Name | None = None,
+ now: float | None = None,
+ policy: Policy | None = None,
) -> None:
"""Validate an RRset against a signature RRset, throwing an exception
if none of the signatures validate.
def _sign(
- rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
private_key: PrivateKey,
signer: dns.name.Name,
dnskey: DNSKEY,
- inception: Optional[Union[datetime, str, int, float]] = None,
- expiration: Optional[Union[datetime, str, int, float]] = None,
- lifetime: Optional[int] = None,
+ inception: datetime | str | int | float | None = None,
+ expiration: datetime | str | int | float | None = None,
+ lifetime: int | None = None,
verify: bool = False,
- policy: Optional[Policy] = None,
- origin: Optional[dns.name.Name] = None,
+ policy: Policy | None = None,
+ origin: dns.name.Name | None = None,
deterministic: bool = True,
) -> RRSIG:
"""Sign RRset using private key.
def _make_rrsig_signature_data(
- rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+ rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
rrsig: RRSIG,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
) -> bytes:
"""Create signature rdata.
def _make_dnskey(
public_key: PublicKey,
- algorithm: Union[int, str],
+ algorithm: int | str,
flags: int = Flag.ZONE,
protocol: int = 3,
) -> DNSKEY:
def _make_cdnskey(
public_key: PublicKey,
- algorithm: Union[int, str],
+ algorithm: int | str,
flags: int = Flag.ZONE,
protocol: int = 3,
) -> CDNSKEY:
def nsec3_hash(
- domain: Union[dns.name.Name, str],
- salt: Optional[Union[str, bytes]],
+ domain: dns.name.Name | str,
+ salt: str | bytes | None,
iterations: int,
- algorithm: Union[int, str],
+ algorithm: int | str,
) -> str:
"""
Calculate the NSEC3 hash, according to
def make_ds_rdataset(
- rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
- algorithms: Set[Union[DSDigest, str]],
- origin: Optional[dns.name.Name] = None,
+ rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
+ algorithms: Set[DSDigest | str],
+ origin: dns.name.Name | None = None,
) -> dns.rdataset.Rdataset:
"""Create a DS record from DNSKEY/CDNSKEY/CDS.
def dnskey_rdataset_to_cds_rdataset(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
rdataset: dns.rdataset.Rdataset,
- algorithm: Union[DSDigest, str],
- origin: Optional[dns.name.Name] = None,
+ algorithm: DSDigest | str,
+ origin: dns.name.Name | None = None,
) -> dns.rdataset.Rdataset:
"""Create a CDS record from DNSKEY/CDNSKEY.
signer: dns.name.Name,
ksks: List[Tuple[PrivateKey, DNSKEY]],
zsks: List[Tuple[PrivateKey, DNSKEY]],
- inception: Optional[Union[datetime, str, int, float]] = None,
- expiration: Optional[Union[datetime, str, int, float]] = None,
- lifetime: Optional[int] = None,
- policy: Optional[Policy] = None,
- origin: Optional[dns.name.Name] = None,
+ inception: datetime | str | int | float | None = None,
+ expiration: datetime | str | int | float | None = None,
+ lifetime: int | None = None,
+ policy: Policy | None = None,
+ origin: dns.name.Name | None = None,
deterministic: bool = True,
) -> None:
"""Default RRset signer"""
def sign_zone(
zone: dns.zone.Zone,
- txn: Optional[dns.transaction.Transaction] = None,
- keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None,
+ txn: dns.transaction.Transaction | None = None,
+ keys: List[Tuple[PrivateKey, DNSKEY]] | None = None,
add_dnskey: bool = True,
- dnskey_ttl: Optional[int] = None,
- inception: Optional[Union[datetime, str, int, float]] = None,
- expiration: Optional[Union[datetime, str, int, float]] = None,
- lifetime: Optional[int] = None,
- nsec3: Optional[NSEC3PARAM] = None,
- rrset_signer: Optional[RRsetSigner] = None,
- policy: Optional[Policy] = None,
+ dnskey_ttl: int | None = None,
+ inception: datetime | str | int | float | None = None,
+ expiration: datetime | str | int | float | None = None,
+ lifetime: int | None = None,
+ nsec3: NSEC3PARAM | None = None,
+ rrset_signer: RRsetSigner | None = None,
+ policy: Policy | None = None,
deterministic: bool = True,
) -> None:
"""Sign zone.
def _sign_zone_nsec(
zone: dns.zone.Zone,
txn: dns.transaction.Transaction,
- rrset_signer: Optional[RRsetSigner] = None,
+ rrset_signer: RRsetSigner | None = None,
) -> None:
"""NSEC zone signer"""
def _txn_add_nsec(
txn: dns.transaction.Transaction,
name: dns.name.Name,
- next_secure: Optional[dns.name.Name],
+ next_secure: dns.name.Name | None,
rdclass: dns.rdataclass.RdataClass,
ttl: int,
- rrset_signer: Optional[RRsetSigner] = None,
+ rrset_signer: RRsetSigner | None = None,
) -> None:
"""NSEC zone signer helper"""
mandatory_types = set(
if dns._features.have("dnssec"):
from cryptography.exceptions import InvalidSignature
- from cryptography.hazmat.primitives.asymmetric import dsa # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import ec # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import ed448 # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import rsa # pylint: disable=W0611
-from typing import Dict, Optional, Tuple, Type, Union
+from typing import Dict, Tuple, Type
import dns._features
import dns.name
else:
_have_cryptography = False
-AlgorithmPrefix = Optional[Union[bytes, dns.name.Name]]
+AlgorithmPrefix = bytes | dns.name.Name | None
algorithms: Dict[Tuple[Algorithm, AlgorithmPrefix], Type[GenericPrivateKey]] = {}
if _have_cryptography:
def get_algorithm_cls(
- algorithm: Union[int, str], prefix: AlgorithmPrefix = None
+ algorithm: int | str, prefix: AlgorithmPrefix = None
) -> Type[GenericPrivateKey]:
"""Get Private Key class from Algorithm.
def register_algorithm_cls(
- algorithm: Union[int, str],
+ algorithm: int | str,
algorithm_cls: Type[GenericPrivateKey],
- name: Optional[Union[dns.name.Name, str]] = None,
- oid: Optional[bytes] = None,
+ name: dns.name.Name | str | None = None,
+ oid: bytes | None = None,
) -> None:
"""Register Algorithm Private Key class.
from abc import ABC, abstractmethod # pylint: disable=no-name-in-module
-from typing import Any, Optional, Type
+from typing import Any, Type
import dns.rdataclass
import dns.rdatatype
@classmethod
@abstractmethod
def from_pem(
- cls, private_pem: bytes, password: Optional[bytes] = None
+ cls, private_pem: bytes, password: bytes | None = None
) -> "GenericPrivateKey":
"""Create private key from PEM-encoded PKCS#8"""
@abstractmethod
- def to_pem(self, password: Optional[bytes] = None) -> bytes:
+ def to_pem(self, password: bytes | None = None) -> bytes:
"""Return private key as PEM-encoded PKCS#8"""
-from typing import Any, Optional, Type
+from typing import Any, Type
from cryptography.hazmat.primitives import serialization
@classmethod
def from_pem(
- cls, private_pem: bytes, password: Optional[bytes] = None
+ cls, private_pem: bytes, password: bytes | None = None
) -> "GenericPrivateKey":
key = serialization.load_pem_private_key(private_pem, password=password)
return cls(key=key)
- def to_pem(self, password: Optional[bytes] = None) -> bytes:
+ def to_pem(self, password: bytes | None = None) -> bytes:
encryption_algorithm: serialization.KeySerializationEncryption
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password)
"""DNS E.164 helpers."""
-from typing import Iterable, Optional, Union
+from typing import Iterable
import dns.exception
import dns.name
def from_e164(
- text: str, origin: Optional[dns.name.Name] = public_enum_domain
+ text: str, origin: dns.name.Name | None = public_enum_domain
) -> dns.name.Name:
"""Convert an E.164 number in textual form into a Name object whose
value is the ENUM domain name for that number.
def to_e164(
name: dns.name.Name,
- origin: Optional[dns.name.Name] = public_enum_domain,
+ origin: dns.name.Name | None = public_enum_domain,
want_plus_prefix: bool = True,
) -> str:
"""Convert an ENUM domain name into an E.164 number.
def query(
number: str,
- domains: Iterable[Union[dns.name.Name, str]],
- resolver: Optional[dns.resolver.Resolver] = None,
+ domains: Iterable[dns.name.Name | str],
+ resolver: dns.resolver.Resolver | None = None,
) -> dns.resolver.Answer:
"""Look for NAPTR RRs for the specified number in the specified domains.
import math
import socket
import struct
-from typing import Any, Dict, Optional, Union
+from typing import Any, Dict
import dns.enum
import dns.inet
class Option:
"""Base class for all EDNS option types."""
- def __init__(self, otype: Union[OptionType, str]):
+ def __init__(self, otype: OptionType | str):
"""Initialize an option.
*otype*, a ``dns.edns.OptionType``, is the option type.
"""
self.otype = OptionType.make(otype)
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
+ def to_wire(self, file: Any | None = None) -> bytes | None:
"""Convert an option to wire format.
Returns a ``bytes`` or ``None``.
implementation.
"""
- def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]):
+ def __init__(self, otype: OptionType | str, data: bytes | str):
super().__init__(otype)
self.data = dns.rdata.Rdata._as_bytes(data, True)
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
+ def to_wire(self, file: Any | None = None) -> bytes | None:
if file:
file.write(self.data)
return None
@classmethod
def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
+ cls, otype: OptionType | str, parser: "dns.wire.Parser"
) -> Option:
return cls(otype, parser.get_remaining())
class ECSOption(Option): # lgtm[py/missing-equals]
"""EDNS Client Subnet (ECS, RFC7871)"""
- def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0):
+ def __init__(self, address: str, srclen: int | None = None, scopelen: int = 0):
"""*address*, a ``str``, is the client address information.
*srclen*, an ``int``, the source prefix length, which is the
)
return ECSOption(address, srclen, scope)
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
+ def to_wire(self, file: Any | None = None) -> bytes | None:
value = (
struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata
)
@classmethod
def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
+ cls, otype: OptionType | str, parser: "dns.wire.Parser"
) -> Option:
family, src, scope = parser.get_struct("!HBB")
addrlen = int(math.ceil(src / 8.0))
_preserve_case = {"DNSKEY", "DS", "DNSSEC", "RRSIGs", "NSEC", "NXDOMAIN"}
- def __init__(self, code: Union[EDECode, str], text: Optional[str] = None):
+ def __init__(self, code: EDECode | str, text: str | None = None):
"""*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the
extended error.
output += f": {self.text}"
return output
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
+ def to_wire(self, file: Any | None = None) -> bytes | None:
value = struct.pack("!H", self.code)
if self.text is not None:
value += self.text.encode("utf8")
@classmethod
def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
+ cls, otype: OptionType | str, parser: "dns.wire.Parser"
) -> Option:
code = EDECode.make(parser.get_uint16())
text = parser.get_remaining()
super().__init__(OptionType.NSID)
self.nsid = nsid
- def to_wire(self, file: Any = None) -> Optional[bytes]:
+ def to_wire(self, file: Any = None) -> bytes | None:
if file:
file.write(self.nsid)
return None
@classmethod
def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: dns.wire.Parser
+ cls, otype: OptionType | str, parser: dns.wire.Parser
) -> Option:
return cls(parser.get_remaining())
if len(server) != 0 and (len(server) < 8 or len(server) > 32):
raise ValueError("server cookie must be empty or between 8 and 32 bytes")
- def to_wire(self, file: Any = None) -> Optional[bytes]:
+ def to_wire(self, file: Any = None) -> bytes | None:
if file:
file.write(self.client)
if len(self.server) > 0:
@classmethod
def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: dns.wire.Parser
+ cls, otype: OptionType | str, parser: dns.wire.Parser
) -> Option:
return cls(parser.get_bytes(8), parser.get_remaining())
super().__init__(OptionType.REPORTCHANNEL)
self.agent_domain = agent_domain
- def to_wire(self, file: Any = None) -> Optional[bytes]:
+ def to_wire(self, file: Any = None) -> bytes | None:
return self.agent_domain.to_wire(file)
def to_text(self) -> str:
@classmethod
def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: dns.wire.Parser
+ cls, otype: OptionType | str, parser: dns.wire.Parser
) -> Option:
return cls(parser.get_name())
def option_from_wire_parser(
- otype: Union[OptionType, str], parser: "dns.wire.Parser"
+ otype: OptionType | str, parser: "dns.wire.Parser"
) -> Option:
"""Build an EDNS option object from wire format.
def option_from_wire(
- otype: Union[OptionType, str], wire: bytes, current: int, olen: int
+ otype: OptionType | str, wire: bytes, current: int, olen: int
) -> Option:
"""Build an EDNS option object from wire format.
import random
import threading
import time
-from typing import Any, Optional, Union
+from typing import Any
class EntropyPool:
# leaving this code doesn't hurt anything as the library code
# is used if present.
- def __init__(self, seed: Optional[bytes] = None):
+ def __init__(self, seed: bytes | None = None):
self.pool_index = 0
- self.digest: Optional[bytearray] = None
+ self.digest: bytearray | None = None
self.next_byte = 0
self.lock = threading.Lock()
self.hash = hashlib.sha1()
self.seeded = False
self.seed_pid = 0
- def _stir(self, entropy: Union[bytes, bytearray]) -> None:
+ def _stir(self, entropy: bytes | bytearray) -> None:
for c in entropy:
if self.pool_index == self.hash_len:
self.pool_index = 0
self.pool[self.pool_index] ^= b
self.pool_index += 1
- def stir(self, entropy: Union[bytes, bytearray]) -> None:
+ def stir(self, entropy: bytes | bytearray) -> None:
with self.lock:
self._stir(entropy)
pool = EntropyPool()
-system_random: Optional[Any]
+system_random: Any | None
try:
system_random = random.SystemRandom()
except Exception: # pragma: no cover
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import enum
-from typing import Any, Optional, Type, TypeVar, Union
+from typing import Any, Type, TypeVar
TIntEnum = TypeVar("TIntEnum", bound="IntEnum")
return text
@classmethod
- def make(cls: Type[TIntEnum], value: Union[int, str]) -> TIntEnum:
+ def make(cls: Type[TIntEnum], value: int | str) -> TIntEnum:
"""Convert text or a value into an enumerated type, if possible.
*value*, the ``int`` or ``str`` to convert.
return ""
@classmethod
- def _extra_from_text(cls, text: str) -> Optional[Any]: # pylint: disable=W0613
+ def _extra_from_text(cls, text: str) -> Any | None: # pylint: disable=W0613
return None
@classmethod
"""
-from typing import Optional, Set
+from typing import Set
class DNSException(Exception):
and ``fmt`` class variables to get nice parametrized messages.
"""
- msg: Optional[str] = None # non-parametrized message
+ msg: str | None = None # non-parametrized message
supp_kwargs: Set[str] = set() # accepted parameters for _fmt_kwargs (sanity check)
- fmt: Optional[str] = None # message parametrized with results from _fmt_kwargs
+ fmt: str | None = None # message parametrized with results from _fmt_kwargs
def __init__(self, *args, **kwargs):
self._check_params(*args, **kwargs)
"""
fmtargs = {}
for kw, data in kwargs.items():
- if isinstance(data, (list, set)):
+ if isinstance(data, list | set):
# convert list of <someobj> to list of str(<someobj>)
fmtargs[kw] = list(map(str, data))
if len(fmtargs[kw]) == 1:
"""Generic Internet address helper functions."""
import socket
-from typing import Any, Optional, Tuple
+from typing import Any, Tuple
import dns.ipv4
import dns.ipv6
return False
-def low_level_address_tuple(
- high_tuple: Tuple[str, int], af: Optional[int] = None
-) -> Any:
+def low_level_address_tuple(high_tuple: Tuple[str, int], af: int | None = None) -> Any:
"""Given a "high-level" address tuple, i.e.
an (address, port) return the appropriate "low-level" address tuple
suitable for use in socket calls.
"""IPv4 helper functions."""
import struct
-from typing import Union
import dns.exception
return f"{address[0]}.{address[1]}.{address[2]}.{address[3]}"
-def inet_aton(text: Union[str, bytes]) -> bytes:
+def inet_aton(text: str | bytes) -> bytes:
"""Convert an IPv4 address in text form to binary form.
*text*, a ``str`` or ``bytes``, the IPv4 address in textual form.
raise dns.exception.SyntaxError
-def canonicalize(text: Union[str, bytes]) -> str:
+def canonicalize(text: str | bytes) -> str:
"""Verify that *address* is a valid text form IPv4 address and return its
canonical text form.
import binascii
import re
-from typing import List, Union
+from typing import List
import dns.exception
import dns.ipv4
_colon_colon_end = re.compile(rb".*::$")
-def inet_aton(text: Union[str, bytes], ignore_scope: bool = False) -> bytes:
+def inet_aton(text: str | bytes, ignore_scope: bool = False) -> bytes:
"""Convert an IPv6 address in text form to binary form.
*text*, a ``str`` or ``bytes``, the IPv6 address in textual form.
return address.startswith(_mapped_prefix)
-def canonicalize(text: Union[str, bytes]) -> str:
+def canonicalize(text: str | bytes) -> str:
"""Verify that *address* is a valid text form IPv6 address and return its
canonical text form. Addresses with scopes are rejected.
import enum
import io
import time
-from typing import Any, Dict, List, Optional, Tuple, Union, cast
+from typing import Any, Dict, List, Tuple, cast
import dns.edns
import dns.entropy
dns.name.Name,
dns.rdataclass.RdataClass,
dns.rdatatype.RdataType,
- Optional[dns.rdatatype.RdataType],
- Optional[dns.rdataclass.RdataClass],
+ dns.rdatatype.RdataType | None,
+ dns.rdataclass.RdataClass | None,
]
IndexType = Dict[IndexKeyType, dns.rrset.RRset]
-SectionType = Union[int, str, List[dns.rrset.RRset]]
+SectionType = int | str | List[dns.rrset.RRset]
class Message:
_section_enum = MessageSection
- def __init__(self, id: Optional[int] = None):
+ def __init__(self, id: int | None = None):
if id is None:
self.id = dns.entropy.random_16()
else:
self.id = id
self.flags = 0
self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
- self.opt: Optional[dns.rrset.RRset] = None
+ self.opt: dns.rrset.RRset | None = None
self.request_payload = 0
self.pad = 0
self.keyring: Any = None
- self.tsig: Optional[dns.rrset.RRset] = None
+ self.tsig: dns.rrset.RRset | None = None
self.want_tsig_sign = False
self.request_mac = b""
self.xfr = False
- self.origin: Optional[dns.name.Name] = None
- self.tsig_ctx: Optional[Any] = None
+ self.origin: dns.name.Name | None = None
+ self.tsig_ctx: Any | None = None
self.index: IndexType = {}
self.errors: List[MessageError] = []
self.time = 0.0
- self.wire: Optional[bytes] = None
+ self.wire: bytes | None = None
@property
def question(self) -> List[dns.rrset.RRset]:
def to_text(
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
**kw: Dict[str, Any],
) -> str:
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
- deleting: Optional[dns.rdataclass.RdataClass] = None,
+ deleting: dns.rdataclass.RdataClass | None = None,
create: bool = False,
force_unique: bool = False,
- idna_codec: Optional[dns.name.IDNACodec] = None,
+ idna_codec: dns.name.IDNACodec | None = None,
) -> dns.rrset.RRset:
"""Find the RRset with the given attributes in the specified section.
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
- deleting: Optional[dns.rdataclass.RdataClass] = None,
+ deleting: dns.rdataclass.RdataClass | None = None,
create: bool = False,
force_unique: bool = False,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- ) -> Optional[dns.rrset.RRset]:
+ idna_codec: dns.name.IDNACodec | None = None,
+ ) -> dns.rrset.RRset | None:
"""Get the RRset with the given attributes in the specified section.
If the RRset is not found, None is returned.
def to_wire(
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
max_size: int = 0,
multi: bool = False,
- tsig_ctx: Optional[Any] = None,
+ tsig_ctx: Any | None = None,
prepend_length: bool = False,
prefer_truncation: bool = False,
**kw: Dict[str, Any],
def use_tsig(
self,
keyring: Any,
- keyname: Optional[Union[dns.name.Name, str]] = None,
+ keyname: dns.name.Name | str | None = None,
fudge: int = 300,
- original_id: Optional[int] = None,
+ original_id: int | None = None,
tsig_error: int = 0,
other_data: bytes = b"",
- algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
+ algorithm: dns.name.Name | str = dns.tsig.default_algorithm,
) -> None:
"""When sending, a TSIG signature using the specified key
should be added.
self.want_tsig_sign = True
@property
- def keyname(self) -> Optional[dns.name.Name]:
+ def keyname(self) -> dns.name.Name | None:
if self.tsig:
return self.tsig.name
else:
return None
@property
- def keyalgorithm(self) -> Optional[dns.name.Name]:
+ def keyalgorithm(self) -> dns.name.Name | None:
if self.tsig:
rdata = cast(dns.rdtypes.ANY.TSIG.TSIG, self.tsig[0])
return rdata.algorithm
return None
@property
- def mac(self) -> Optional[bytes]:
+ def mac(self) -> bytes | None:
if self.tsig:
rdata = cast(dns.rdtypes.ANY.TSIG.TSIG, self.tsig[0])
return rdata.mac
return None
@property
- def tsig_error(self) -> Optional[int]:
+ def tsig_error(self) -> int | None:
if self.tsig:
rdata = cast(dns.rdtypes.ANY.TSIG.TSIG, self.tsig[0])
return rdata.error
def use_edns(
self,
- edns: Optional[Union[int, bool]] = 0,
+ edns: int | bool | None = 0,
ednsflags: int = 0,
payload: int = DEFAULT_EDNS_PAYLOAD,
- request_payload: Optional[int] = None,
- options: Optional[List[dns.edns.Option]] = None,
+ request_payload: int | None = None,
+ options: List[dns.edns.Option] | None = None,
pad: int = 0,
) -> None:
"""Configure EDNS behavior.
def __init__(
self,
canonical_name: dns.name.Name,
- answer: Optional[dns.rrset.RRset],
+ answer: dns.rrset.RRset | None,
minimum_ttl: int,
cnames: List[dns.rrset.RRset],
):
def from_wire(
wire: bytes,
- keyring: Optional[Any] = None,
- request_mac: Optional[bytes] = b"",
+ keyring: Any | None = None,
+ request_mac: bytes | None = b"",
xfr: bool = False,
- origin: Optional[dns.name.Name] = None,
- tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None,
+ origin: dns.name.Name | None = None,
+ tsig_ctx: dns.tsig.HMACTSig | dns.tsig.GSSTSig | None = None,
multi: bool = False,
question_only: bool = False,
one_rr_per_rrset: bool = False,
def __init__(
self,
text: str,
- idna_codec: Optional[dns.name.IDNACodec],
+ idna_codec: dns.name.IDNACodec | None,
one_rr_per_rrset: bool = False,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
):
- self.message: Optional[Message] = None # mypy: ignore
+ self.message: Message | None = None # mypy: ignore
self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec)
self.last_name = None
self.one_rr_per_rrset = one_rr_per_rrset
def from_text(
text: str,
- idna_codec: Optional[dns.name.IDNACodec] = None,
+ idna_codec: dns.name.IDNACodec | None = None,
one_rr_per_rrset: bool = False,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> Message:
"""Convert the text format message into a message object.
def from_file(
f: Any,
- idna_codec: Optional[dns.name.IDNACodec] = None,
+ idna_codec: dns.name.IDNACodec | None = None,
one_rr_per_rrset: bool = False,
) -> Message:
"""Read the next text format message from the specified file.
def make_query(
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- use_edns: Optional[Union[int, bool]] = None,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
+ use_edns: int | bool | None = None,
want_dnssec: bool = False,
- ednsflags: Optional[int] = None,
- payload: Optional[int] = None,
- request_payload: Optional[int] = None,
- options: Optional[List[dns.edns.Option]] = None,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- id: Optional[int] = None,
+ ednsflags: int | None = None,
+ payload: int | None = None,
+ request_payload: int | None = None,
+ options: List[dns.edns.Option] | None = None,
+ idna_codec: dns.name.IDNACodec | None = None,
+ id: int | None = None,
flags: int = dns.flags.RD,
pad: int = 0,
) -> QueryMessage:
our_payload: int = 8192,
fudge: int = 300,
tsig_error: int = 0,
- pad: Optional[int] = None,
- copy_mode: Optional[CopyMode] = None,
+ pad: int | None = None,
+ copy_mode: CopyMode | None = None,
) -> Message:
"""Make a message which is a response for the specified query.
The message returned is really a response skeleton; it has all of the infrastructure
import encodings.idna # type: ignore
import functools
import struct
-from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union
+from typing import Any, Callable, Dict, Iterable, Optional, Tuple
import dns._features
import dns.enum
_escaped_text = '"().;\\@$'
-def _escapify(label: Union[bytes, str]) -> str:
+def _escapify(label: bytes | str) -> str:
"""Escape the characters in label which need it.
@returns: the escaped string
@rtype: string"""
raise EmptyLabel
-def _maybe_convert_to_binary(label: Union[bytes, str]) -> bytes:
+def _maybe_convert_to_binary(label: bytes | str) -> bytes:
"""If label is ``str``, convert it to ``bytes``. If it is already
``bytes`` just return it.
__slots__ = ["labels"]
- def __init__(self, labels: Iterable[Union[bytes, str]]):
+ def __init__(self, labels: Iterable[bytes | str]):
"""*labels* is any iterable whose values are ``str`` or ``bytes``."""
blabels = [_maybe_convert_to_binary(x) for x in labels]
return s
def to_unicode(
- self, omit_final_dot: bool = False, idna_codec: Optional[IDNACodec] = None
+ self, omit_final_dot: bool = False, idna_codec: IDNACodec | None = None
) -> str:
"""Convert name to Unicode text format.
def to_wire(
self,
- file: Optional[Any] = None,
- compress: Optional[CompressType] = None,
+ file: Any | None = None,
+ compress: CompressType | None = None,
origin: Optional["Name"] = None,
canonicalize: bool = False,
- ) -> Optional[bytes]:
+ ) -> bytes | None:
"""Convert name to wire format, possibly compressing it.
*file* is the file where the name is emitted (typically an
def from_unicode(
- text: str, origin: Optional[Name] = root, idna_codec: Optional[IDNACodec] = None
+ text: str, origin: Name | None = root, idna_codec: IDNACodec | None = None
) -> Name:
"""Convert unicode text into a Name object.
def from_text(
- text: Union[bytes, str],
- origin: Optional[Name] = root,
- idna_codec: Optional[IDNACodec] = None,
+ text: bytes | str,
+ origin: Name | None = root,
+ idna_codec: IDNACodec | None = None,
) -> Name:
"""Convert text into a Name object.
-from typing import Optional, Union
from urllib.parse import urlparse
import dns.asyncbackend
import dns.asyncquery
-import dns.inet
import dns.message
import dns.query
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
one_rr_per_rrset: bool = False,
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
backend: dns.asyncbackend.Backend,
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
one_rr_per_rrset: bool = False,
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
backend: dns.asyncbackend.Backend,
def __init__(
self,
url: str,
- bootstrap_address: Optional[str] = None,
- verify: Union[bool, str] = True,
+ bootstrap_address: str | None = None,
+ verify: bool | str = True,
want_get: bool = False,
http_version: dns.query.HTTPVersion = dns.query.HTTPVersion.DEFAULT,
):
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool = False,
one_rr_per_rrset: bool = False,
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
backend: dns.asyncbackend.Backend,
self,
address: str,
port: int = 853,
- hostname: Optional[str] = None,
- verify: Union[bool, str] = True,
+ hostname: str | None = None,
+ verify: bool | str = True,
):
super().__init__(address, port)
self.hostname = hostname
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool = False,
one_rr_per_rrset: bool = False,
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
backend: dns.asyncbackend.Backend,
self,
address: str,
port: int = 853,
- verify: Union[bool, str] = True,
- server_hostname: Optional[str] = None,
+ verify: bool | str = True,
+ server_hostname: str | None = None,
):
super().__init__(address, port)
self.verify = verify
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool = False,
one_rr_per_rrset: bool = False,
self,
request: dns.message.QueryMessage,
timeout: float,
- source: Optional[str],
+ source: str | None,
source_port: int,
max_size: bool,
backend: dns.asyncbackend.Backend,
import enum
import io
-from typing import Any, Dict, Optional
+from typing import Any, Dict
import dns.immutable
import dns.name
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
-import dns.renderer
import dns.rrset
_cname_types = {
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
"""Get an rdataset matching the specified properties in the
current node.
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
import struct
import time
import urllib.parse
-from typing import Any, Callable, Dict, Optional, Tuple, Union, cast
+from typing import Any, Callable, Dict, Optional, Tuple, cast
import dns._features
import dns._tls_util
def default_socket_factory(
- af: Union[socket.AddressFamily, int],
+ af: socket.AddressFamily | int,
kind: socket.SocketKind,
proto: int,
) -> socket.socket:
# Function used to create a socket. Can be overridden if needed in special
# situations.
socket_factory: Callable[
- [Union[socket.AddressFamily, int], socket.SocketKind, int], socket.socket
+ [socket.AddressFamily | int, socket.SocketKind, int], socket.socket
] = default_socket_factory
def make_socket(
- af: Union[socket.AddressFamily, int],
+ af: socket.AddressFamily | int,
type: socket.SocketKind,
- source: Optional[Any] = None,
+ source: Any | None = None,
) -> socket.socket:
"""Make a socket.
def make_ssl_socket(
- af: Union[socket.AddressFamily, int],
+ af: socket.AddressFamily | int,
type: socket.SocketKind,
ssl_context: ssl.SSLContext,
- server_hostname: Optional[Union[dns.name.Name, str]] = None,
- source: Optional[Any] = None,
+ server_hostname: dns.name.Name | str | None = None,
+ source: Any | None = None,
) -> ssl.SSLSocket:
"""Make a socket.
def https(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 443,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- session: Optional[Any] = None,
+ session: Any | None = None,
path: str = "/dns-query",
post: bool = True,
- bootstrap_address: Optional[str] = None,
- verify: Union[bool, str, ssl.SSLContext] = True,
+ bootstrap_address: str | None = None,
+ verify: bool | str | ssl.SSLContext = True,
resolver: Optional["dns.resolver.Resolver"] = None, # pyright: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
q: dns.message.Message,
where: str,
url: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 443,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- verify: Union[bool, str, ssl.SSLContext] = True,
+ verify: bool | str | ssl.SSLContext = True,
post: bool = True,
- connection: Optional[dns.quic.SyncQuicConnection] = None,
+ connection: dns.quic.SyncQuicConnection | None = None,
) -> dns.message.Message:
if not dns.quic.have_quic:
raise NoDOH("DNS-over-HTTP3 is not available.") # pragma: no cover
def send_udp(
sock: Any,
- what: Union[dns.message.Message, bytes],
+ what: dns.message.Message | bytes,
destination: Any,
- expiration: Optional[float] = None,
+ expiration: float | None = None,
) -> Tuple[int, float]:
"""Send a DNS message to the specified UDP socket.
def receive_udp(
sock: Any,
- destination: Optional[Any] = None,
- expiration: Optional[float] = None,
+ destination: Any | None = None,
+ expiration: float | None = None,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None,
- request_mac: Optional[bytes] = b"",
+ keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ request_mac: bytes | None = b"",
ignore_trailing: bool = False,
raise_on_truncation: bool = False,
ignore_errors: bool = False,
- query: Optional[dns.message.Message] = None,
+ query: dns.message.Message | None = None,
) -> Any:
"""Read a DNS message from a UDP socket.
def udp(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 53,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
raise_on_truncation: bool = False,
- sock: Optional[Any] = None,
+ sock: Any | None = None,
ignore_errors: bool = False,
) -> dns.message.Message:
"""Return the response obtained after sending a query via UDP.
def udp_with_fallback(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 53,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- udp_sock: Optional[Any] = None,
- tcp_sock: Optional[Any] = None,
+ udp_sock: Any | None = None,
+ tcp_sock: Any | None = None,
ignore_errors: bool = False,
) -> Tuple[dns.message.Message, bool]:
"""Return the response to the query, trying UDP first and falling back
def send_tcp(
sock: Any,
- what: Union[dns.message.Message, bytes],
- expiration: Optional[float] = None,
+ what: dns.message.Message | bytes,
+ expiration: float | None = None,
) -> Tuple[int, float]:
"""Send a DNS message to the specified TCP socket.
def receive_tcp(
sock: Any,
- expiration: Optional[float] = None,
+ expiration: float | None = None,
one_rr_per_rrset: bool = False,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None,
- request_mac: Optional[bytes] = b"",
+ keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ request_mac: bytes | None = b"",
ignore_trailing: bool = False,
) -> Tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
def tcp(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 53,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- sock: Optional[Any] = None,
+ sock: Any | None = None,
) -> dns.message.Message:
"""Return the response obtained after sending a query via TCP.
def make_ssl_context(
- verify: Union[bool, str] = True,
+ verify: bool | str = True,
check_hostname: bool = True,
- alpns: Optional[list[str]] = None,
+ alpns: list[str] | None = None,
) -> ssl.SSLContext:
"""Make an SSL context
# for backwards compatibility
def _make_dot_ssl_context(
- server_hostname: Optional[str], verify: Union[bool, str]
+ server_hostname: str | None, verify: bool | str
) -> ssl.SSLContext:
return make_ssl_context(verify, server_hostname is not None, ["dot"])
def tls(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 853,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- sock: Optional[ssl.SSLSocket] = None,
- ssl_context: Optional[ssl.SSLContext] = None,
- server_hostname: Optional[str] = None,
- verify: Union[bool, str] = True,
+ sock: ssl.SSLSocket | None = None,
+ ssl_context: ssl.SSLContext | None = None,
+ server_hostname: str | None = None,
+ verify: bool | str = True,
) -> dns.message.Message:
"""Return the response obtained after sending a query via TLS.
def quic(
q: dns.message.Message,
where: str,
- timeout: Optional[float] = None,
+ timeout: float | None = None,
port: int = 853,
- source: Optional[str] = None,
+ source: str | None = None,
source_port: int = 0,
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- connection: Optional[dns.quic.SyncQuicConnection] = None,
- verify: Union[bool, str] = True,
- hostname: Optional[str] = None,
- server_hostname: Optional[str] = None,
+ connection: dns.quic.SyncQuicConnection | None = None,
+ verify: bool | str = True,
+ hostname: str | None = None,
+ server_hostname: str | None = None,
) -> dns.message.Message:
"""Return the response obtained after sending a query via DNS-over-QUIC.
def _inbound_xfr(
txn_manager: dns.transaction.TransactionManager,
- s: Union[socket.socket, ssl.SSLSocket],
+ s: socket.socket | ssl.SSLSocket,
query: dns.message.Message,
- serial: Optional[int],
- timeout: Optional[float],
- expiration: Optional[float],
+ serial: int | None,
+ timeout: float | None,
+ expiration: float | None,
) -> Any:
"""Given a socket, does the zone transfer."""
rdtype = query.question[0].rdtype
with dns.xfr.Inbound(txn_manager, rdtype, serial, is_udp) as inbound:
done = False
tsig_ctx = None
- r: Optional[dns.message.Message] = None
+ r: dns.message.Message | None = None
while not done:
(_, mexpiration) = _compute_times(timeout)
if mexpiration is None or (
def xfr(
where: str,
- zone: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.AXFR,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- timeout: Optional[float] = None,
+ zone: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.AXFR,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
+ timeout: float | None = None,
port: int = 53,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]] = None,
- keyname: Optional[Union[dns.name.Name, str]] = None,
+ keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ keyname: dns.name.Name | str | None = None,
relativize: bool = True,
- lifetime: Optional[float] = None,
- source: Optional[str] = None,
+ lifetime: float | None = None,
+ source: str | None = None,
source_port: int = 0,
serial: int = 0,
use_udp: bool = False,
- keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
+ keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm,
) -> Any:
"""Return a generator for the responses to a zone transfer.
def inbound_xfr(
where: str,
txn_manager: dns.transaction.TransactionManager,
- query: Optional[dns.message.Message] = None,
+ query: dns.message.Message | None = None,
port: int = 53,
- timeout: Optional[float] = None,
- lifetime: Optional[float] = None,
- source: Optional[str] = None,
+ timeout: float | None = None,
+ lifetime: float | None = None,
+ source: str | None = None,
source_port: int = 0,
udp_mode: UDPMode = UDPMode.NEVER,
) -> None:
import dns.asyncbackend
if dns._features.have("doq"):
- import aioquic.quic.configuration # type: ignore
-
from dns._asyncbackend import NullContext
- from dns.quic._asyncio import (
- AsyncioQuicConnection,
- AsyncioQuicManager,
- AsyncioQuicStream,
- )
+ from dns.quic._asyncio import AsyncioQuicConnection as AsyncioQuicConnection
+ from dns.quic._asyncio import AsyncioQuicManager
+ from dns.quic._asyncio import AsyncioQuicStream as AsyncioQuicStream
from dns.quic._common import AsyncQuicConnection # pyright: ignore
- from dns.quic._common import AsyncQuicManager
+ from dns.quic._common import AsyncQuicManager as AsyncQuicManager
from dns.quic._sync import SyncQuicConnection # pyright: ignore
from dns.quic._sync import SyncQuicStream # pyright: ignore
- from dns.quic._sync import SyncQuicManager
+ from dns.quic._sync import SyncQuicManager as SyncQuicManager
have_quic = True
if dns._features.have("trio"):
import trio
- from dns.quic._trio import ( # pylint: disable=ungrouped-imports
- TrioQuicConnection,
- TrioQuicManager,
- TrioQuicStream,
+ from dns.quic._trio import (
+ TrioQuicConnection as TrioQuicConnection, # pylint: disable=ungrouped-imports
)
+ from dns.quic._trio import TrioQuicManager
+ from dns.quic._trio import TrioQuicStream as TrioQuicStream
def _trio_context_factory():
return trio.open_nursery()
import struct
import time
import urllib.parse
-from typing import Any, Optional
+from typing import Any
import aioquic.h3.connection # type: ignore
-import aioquic.h3.events # type: ignore
import aioquic.quic.configuration # type: ignore
import aioquic.quic.connection # type: ignore
class AsyncQuicConnection(BaseQuicConnection):
- async def make_stream(self, timeout: Optional[float] = None) -> Any:
+ async def make_stream(self, timeout: float | None = None) -> Any:
pass
import itertools
import random
from importlib import import_module
-from typing import Any, Dict, Optional, Tuple, Union
+from typing import Any, Dict, Tuple
import dns.exception
import dns.immutable
def to_text(
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
**kw: Dict[str, Any],
) -> str:
def _to_wire(
self,
file: Any,
- compress: Optional[dns.name.CompressType] = None,
- origin: Optional[dns.name.Name] = None,
+ compress: dns.name.CompressType | None = None,
+ origin: dns.name.Name | None = None,
canonicalize: bool = False,
) -> None:
raise NotImplementedError # pragma: no cover
def to_wire(
self,
- file: Optional[Any] = None,
- compress: Optional[dns.name.CompressType] = None,
- origin: Optional[dns.name.Name] = None,
+ file: Any | None = None,
+ compress: dns.name.CompressType | None = None,
+ origin: dns.name.Name | None = None,
canonicalize: bool = False,
- ) -> Optional[bytes]:
+ ) -> bytes | None:
"""Convert an rdata to wire format.
Returns a ``bytes`` if no output file was specified, or ``None`` otherwise.
self._to_wire(f, compress, origin, canonicalize)
return f.getvalue()
- def to_generic(self, origin: Optional[dns.name.Name] = None) -> "GenericRdata":
+ def to_generic(self, origin: dns.name.Name | None = None) -> "GenericRdata":
"""Creates a dns.rdata.GenericRdata equivalent of this rdata.
Returns a ``dns.rdata.GenericRdata``.
assert wire is not None # for type checkers
return GenericRdata(self.rdclass, self.rdtype, wire)
- def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes:
+ def to_digestable(self, origin: dns.name.Name | None = None) -> bytes:
"""Convert rdata to a format suitable for digesting in hashes. This
is also the DNSSEC canonical form.
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
tok: dns.tokenizer.Tokenizer,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> "Rdata":
raise NotImplementedError # pragma: no cover
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
parser: dns.wire.Parser,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
) -> "Rdata":
raise NotImplementedError # pragma: no cover
cls,
value: Any,
encode: bool = False,
- max_length: Optional[int] = None,
+ max_length: int | None = None,
empty_ok: bool = True,
) -> bytes:
if encode and isinstance(value, str):
def to_text(
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
**kw: Dict[str, Any],
) -> str:
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(self.data)
- def to_generic(self, origin: Optional[dns.name.Name] = None) -> "GenericRdata":
+ def to_generic(self, origin: dns.name.Name | None = None) -> "GenericRdata":
return self
@classmethod
def from_text(
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- tok: Union[dns.tokenizer.Tokenizer, str],
- origin: Optional[dns.name.Name] = None,
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ tok: dns.tokenizer.Tokenizer | str,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
- idna_codec: Optional[dns.name.IDNACodec] = None,
+ relativize_to: dns.name.Name | None = None,
+ idna_codec: dns.name.IDNACodec | None = None,
) -> Rdata:
"""Build an rdata object from text format.
def from_wire_parser(
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
parser: dns.wire.Parser,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
) -> Rdata:
"""Build an rdata object from wire format
def from_wire(
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
wire: bytes,
current: int,
rdlen: int,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
) -> Rdata:
"""Build an rdata object from wire format
import io
import random
import struct
-from typing import Any, Collection, Dict, List, Optional, Union, cast
+from typing import Any, Collection, Dict, List, cast
import dns.exception
import dns.immutable
# pylint: disable=arguments-differ,arguments-renamed
def add( # pyright: ignore
- self, rd: dns.rdata.Rdata, ttl: Optional[int] = None
+ self, rd: dns.rdata.Rdata, ttl: int | None = None
) -> None:
"""Add the specified rdata to the rdataset.
def to_text(
self,
- name: Optional[dns.name.Name] = None,
- origin: Optional[dns.name.Name] = None,
+ name: dns.name.Name | None = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- override_rdclass: Optional[dns.rdataclass.RdataClass] = None,
+ override_rdclass: dns.rdataclass.RdataClass | None = None,
want_comments: bool = False,
**kw: Dict[str, Any],
) -> str:
self,
name: dns.name.Name,
file: Any,
- compress: Optional[dns.name.CompressType] = None,
- origin: Optional[dns.name.Name] = None,
- override_rdclass: Optional[dns.rdataclass.RdataClass] = None,
+ compress: dns.name.CompressType | None = None,
+ origin: dns.name.Name | None = None,
+ override_rdclass: dns.rdataclass.RdataClass | None = None,
want_shuffle: bool = True,
) -> int:
"""Convert the rdataset to wire format.
file.write(struct.pack("!HHIH", self.rdtype, rdclass, 0, 0))
return 1
else:
- l: Union[Rdataset, List[dns.rdata.Rdata]]
+ l: Rdataset | List[dns.rdata.Rdata]
if want_shuffle:
l = list(self)
random.shuffle(l)
def from_text_list(
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
ttl: int,
text_rdatas: Collection[str],
- idna_codec: Optional[dns.name.IDNACodec] = None,
- origin: Optional[dns.name.Name] = None,
+ idna_codec: dns.name.IDNACodec | None = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> Rdataset:
"""Create an rdataset with the specified class, type, and TTL, and with
the specified list of rdatas in text format.
def from_text(
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
ttl: int,
*text_rdatas: Any,
) -> Rdataset:
"""TXT-like base class."""
-from typing import Any, Dict, Iterable, Optional, Tuple, Union
+from typing import Any, Dict, Iterable, Tuple
import dns.exception
import dns.immutable
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- strings: Iterable[Union[bytes, str]],
+ strings: Iterable[bytes | str],
):
"""Initialize a TXT-like rdata.
def to_text(
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
**kw: Dict[str, Any],
) -> str:
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
tok: dns.tokenizer.Tokenizer,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> dns.rdata.Rdata:
strings = []
for token in tok.get_remaining():
import collections
import random
import struct
-from typing import Any, Iterable, List, Optional, Tuple, Union
+from typing import Any, Iterable, List, Tuple
import dns.exception
import dns.ipv4
name = ""
- def __init__(self, type: Any, gateway: Optional[Union[str, dns.name.Name]] = None):
+ def __init__(self, type: Any, gateway: str | dns.name.Name | None = None):
self.type = dns.rdata.Rdata._as_uint8(type)
self.gateway = gateway
self._check()
type_name = ""
- def __init__(self, windows: Optional[Iterable[Tuple[int, bytes]]] = None):
+ def __init__(self, windows: Iterable[Tuple[int, bytes]] | None = None):
last_window = -1
if windows is None:
windows = []
import threading
import time
import warnings
-from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast
+from typing import Any, Dict, Iterator, List, Sequence, Tuple, cast
from urllib.parse import urlparse
import dns._ddr
super().__init__(*args, **kwargs)
def _check_kwargs(self, qnames, responses=None): # pyright: ignore
- if not isinstance(qnames, (list, tuple, set)):
+ if not isinstance(qnames, list | tuple | set):
raise AttributeError("qnames must be a list, tuple or set")
if len(qnames) == 0:
raise AttributeError("qnames must contain at least one element")
ErrorTuple = Tuple[
- Optional[str],
+ str | None,
bool,
int,
- Union[Exception, str],
- Optional[dns.message.Message],
+ Exception | str,
+ dns.message.Message | None,
]
rdtype: dns.rdatatype.RdataType,
rdclass: dns.rdataclass.RdataClass,
response: dns.message.QueryMessage,
- nameserver: Optional[str] = None,
- port: Optional[int] = None,
+ nameserver: str | None = None,
+ port: int | None = None,
) -> None:
self.qname = qname
self.rdtype = rdtype
@classmethod
def make(
cls,
- v6: Optional[Answer] = None,
- v4: Optional[Answer] = None,
+ v6: Answer | None = None,
+ v4: Answer | None = None,
add_empty: bool = True,
) -> "HostAnswers":
answers = HostAnswers()
now = time.time()
self.next_cleaning = now + self.cleaning_interval
- def get(self, key: CacheKey) -> Optional[Answer]:
+ def get(self, key: CacheKey) -> Answer | None:
"""Get the answer associated with *key*.
Returns None if no answer is cached for the key.
self._maybe_clean()
self.data[key] = value
- def flush(self, key: Optional[CacheKey] = None) -> None:
+ def flush(self, key: CacheKey | None = None) -> None:
"""Flush the cache.
If *key* is not ``None``, only that item is flushed. Otherwise the entire cache
max_size = 1
self.max_size = max_size
- def get(self, key: CacheKey) -> Optional[Answer]:
+ def get(self, key: CacheKey) -> Answer | None:
"""Get the answer associated with *key*.
Returns None if no answer is cached for the key.
node.link_after(self.sentinel)
self.data[key] = node
- def flush(self, key: Optional[CacheKey] = None) -> None:
+ def flush(self, key: CacheKey | None = None) -> None:
"""Flush the cache.
If *key* is not ``None``, only that item is flushed. Otherwise the entire cache
def __init__(
self,
resolver: "BaseResolver",
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- rdclass: Union[dns.rdataclass.RdataClass, str],
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ rdclass: dns.rdataclass.RdataClass | str,
tcp: bool,
raise_on_no_answer: bool,
- search: Optional[bool],
+ search: bool | None,
) -> None:
if isinstance(qname, str):
qname = dns.name.from_text(qname, None)
self.nameservers: List[dns.nameserver.Nameserver] = []
self.current_nameservers: List[dns.nameserver.Nameserver] = []
self.errors: List[ErrorTuple] = []
- self.nameserver: Optional[dns.nameserver.Nameserver] = None
+ self.nameserver: dns.nameserver.Nameserver | None = None
self.tcp_attempt = False
self.retry_with_tcp = False
- self.request: Optional[dns.message.QueryMessage] = None
+ self.request: dns.message.QueryMessage | None = None
self.backoff = 0.0
def next_request(
self,
- ) -> Tuple[Optional[dns.message.QueryMessage], Optional[Answer]]:
+ ) -> Tuple[dns.message.QueryMessage | None, Answer | None]:
"""Get the next request to send, and check the cache.
Returns a (request, answer) tuple. At most one of request or
return (self.nameserver, self.tcp_attempt, backoff)
def query_result(
- self, response: Optional[dns.message.Message], ex: Optional[Exception]
- ) -> Tuple[Optional[Answer], bool]:
+ self, response: dns.message.Message | None, ex: Exception | None
+ ) -> Tuple[Answer | None, bool]:
#
# returns an (answer: Answer, end_loop: bool) tuple.
#
use_search_by_default: bool
timeout: float
lifetime: float
- keyring: Optional[Any]
- keyname: Optional[Union[dns.name.Name, str]]
- keyalgorithm: Union[dns.name.Name, str]
+ keyring: Any | None
+ keyname: dns.name.Name | str | None
+ keyalgorithm: dns.name.Name | str
edns: int
ednsflags: int
- ednsoptions: Optional[List[dns.edns.Option]]
+ ednsoptions: List[dns.edns.Option] | None
payload: int
cache: Any
- flags: Optional[int]
+ flags: int | None
retry_servfail: bool
rotate: bool
- ndots: Optional[int]
- _nameservers: Sequence[Union[str, dns.nameserver.Nameserver]]
+ ndots: int | None
+ _nameservers: Sequence[str | dns.nameserver.Nameserver]
def __init__(
self, filename: str = "/etc/resolv.conf", configure: bool = True
def _compute_timeout(
self,
start: float,
- lifetime: Optional[float] = None,
- errors: Optional[List[ErrorTuple]] = None,
+ lifetime: float | None = None,
+ errors: List[ErrorTuple] | None = None,
) -> float:
lifetime = self.lifetime if lifetime is None else lifetime
now = time.time()
return min(lifetime - duration, self.timeout)
def _get_qnames_to_try(
- self, qname: dns.name.Name, search: Optional[bool]
+ self, qname: dns.name.Name, search: bool | None
) -> List[dns.name.Name]:
# This is a separate method so we can unit test the search
# rules without requiring the Internet.
def use_tsig(
self,
keyring: Any,
- keyname: Optional[Union[dns.name.Name, str]] = None,
- algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
+ keyname: dns.name.Name | str | None = None,
+ algorithm: dns.name.Name | str = dns.tsig.default_algorithm,
) -> None:
"""Add a TSIG signature to each query.
def use_edns(
self,
- edns: Optional[Union[int, bool]] = 0,
+ edns: int | bool | None = 0,
ednsflags: int = 0,
payload: int = dns.message.DEFAULT_EDNS_PAYLOAD,
- options: Optional[List[dns.edns.Option]] = None,
+ options: List[dns.edns.Option] | None = None,
) -> None:
"""Configure EDNS behavior.
@classmethod
def _enrich_nameservers(
cls,
- nameservers: Sequence[Union[str, dns.nameserver.Nameserver]],
+ nameservers: Sequence[str | dns.nameserver.Nameserver],
nameserver_ports: Dict[str, int],
default_port: int,
) -> List[dns.nameserver.Nameserver]:
enriched_nameservers = []
- if isinstance(nameservers, (list, tuple)):
+ if isinstance(nameservers, list | tuple):
for nameserver in nameservers:
enriched_nameserver: dns.nameserver.Nameserver
if isinstance(nameserver, dns.nameserver.Nameserver):
@property
def nameservers(
self,
- ) -> Sequence[Union[str, dns.nameserver.Nameserver]]:
+ ) -> Sequence[str | dns.nameserver.Nameserver]:
return self._nameservers
@nameservers.setter
def nameservers(
- self, nameservers: Sequence[Union[str, dns.nameserver.Nameserver]]
+ self, nameservers: Sequence[str | dns.nameserver.Nameserver]
) -> None:
"""
*nameservers*, a ``list`` or ``tuple`` of nameservers, where a nameserver is either
def resolve(
self,
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
+ lifetime: float | None = None,
+ search: bool | None = None,
) -> Answer: # pylint: disable=arguments-differ
"""Query nameservers to find the answer to the question.
def query(
self,
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
+ lifetime: float | None = None,
) -> Answer: # pragma: no cover
"""Query nameservers to find the answer to the question.
def resolve_name(
self,
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
family: int = socket.AF_UNSPEC,
**kwargs: Any,
) -> HostAnswers:
# pylint: disable=redefined-outer-name
- def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
+ def canonical_name(self, name: dns.name.Name | str) -> dns.name.Name:
"""Determine the canonical name of *name*.
The canonical name is the name the resolver uses for queries
#: The default resolver.
-default_resolver: Optional[Resolver] = None
+default_resolver: Resolver | None = None
def get_default_resolver() -> Resolver:
def resolve(
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
+ lifetime: float | None = None,
+ search: bool | None = None,
) -> Answer: # pragma: no cover
"""Query nameservers to find the answer to the question.
def query(
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
+ lifetime: float | None = None,
) -> Answer: # pragma: no cover
"""Query nameservers to find the answer to the question.
def resolve_name(
- name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any
+ name: dns.name.Name | str, family: int = socket.AF_UNSPEC, **kwargs: Any
) -> HostAnswers:
"""Use a resolver to query for address records.
return get_default_resolver().resolve_name(name, family, **kwargs)
-def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name:
+def canonical_name(name: dns.name.Name | str) -> dns.name.Name:
"""Determine the canonical name of *name*.
See ``dns.resolver.Resolver.canonical_name`` for more information on the
def zone_for_name(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
tcp: bool = False,
- resolver: Optional[Resolver] = None,
- lifetime: Optional[float] = None,
+ resolver: Resolver | None = None,
+ lifetime: float | None = None,
) -> dns.name.Name: # pyright: ignore[reportReturnType]
"""Find the name of the zone which contains the specified name.
if not name.is_absolute():
raise NotAbsolute(name)
start = time.time()
- expiration: Optional[float]
+ expiration: float | None
if lifetime is not None:
expiration = start + lifetime
else:
expiration = None
while 1:
try:
- rlifetime: Optional[float]
+ rlifetime: float | None
if expiration is not None:
rlifetime = expiration - time.time()
if rlifetime <= 0:
def make_resolver_at(
- where: Union[dns.name.Name, str],
+ where: dns.name.Name | str,
port: int = 53,
family: int = socket.AF_UNSPEC,
- resolver: Optional[Resolver] = None,
+ resolver: Resolver | None = None,
) -> Resolver:
"""Make a stub resolver using the specified destination as the full resolver.
"""
if resolver is None:
resolver = get_default_resolver()
- nameservers: List[Union[str, dns.nameserver.Nameserver]] = []
+ nameservers: List[str | dns.nameserver.Nameserver] = []
if isinstance(where, str) and dns.inet.is_address(where):
nameservers.append(dns.nameserver.Do53Nameserver(where, port))
else:
def resolve_at(
- where: Union[dns.name.Name, str],
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ where: dns.name.Name | str,
+ qname: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.A,
+ rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
tcp: bool = False,
- source: Optional[str] = None,
+ source: str | None = None,
raise_on_no_answer: bool = True,
source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
+ lifetime: float | None = None,
+ search: bool | None = None,
port: int = 53,
family: int = socket.AF_UNSPEC,
- resolver: Optional[Resolver] = None,
+ resolver: Resolver | None = None,
) -> Answer:
"""Query nameservers to find the answer to the question.
socket.SOCK_STREAM: [socket.SOL_TCP],
}
-_resolver: Optional[Resolver] = None
+_resolver: Resolver | None = None
_original_getaddrinfo = socket.getaddrinfo
_original_getnameinfo = socket.getnameinfo
_original_getfqdn = socket.getfqdn
return (canonical, aliases, addresses)
-def override_system_resolver(resolver: Optional[Resolver] = None) -> None:
+def override_system_resolver(resolver: Resolver | None = None) -> None:
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
"""DNS RRsets (an RRset is a named rdataset)"""
-from typing import Any, Collection, Dict, Optional, Union, cast
+from typing import Any, Collection, Dict, cast
import dns.name
import dns.rdata
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
- deleting: Optional[dns.rdataclass.RdataClass] = None,
+ deleting: dns.rdataclass.RdataClass | None = None,
):
"""Create a new RRset."""
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType,
- deleting: Optional[dns.rdataclass.RdataClass] = None,
+ deleting: dns.rdataclass.RdataClass | None = None,
) -> bool:
"""Returns ``True`` if this rrset matches the specified name, class,
type, covers, and deletion state.
def to_text( # type: ignore[override]
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
**kw: Dict[str, Any],
) -> str:
def to_wire( # type: ignore[override]
self,
file: Any,
- compress: Optional[dns.name.CompressType] = None, # type: ignore
- origin: Optional[dns.name.Name] = None,
+ compress: dns.name.CompressType | None = None, # type: ignore
+ origin: dns.name.Name | None = None,
**kw: Dict[str, Any],
) -> int:
"""Convert the RRset to wire format.
def from_text_list(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
ttl: int,
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
text_rdatas: Collection[str],
- idna_codec: Optional[dns.name.IDNACodec] = None,
- origin: Optional[dns.name.Name] = None,
+ idna_codec: dns.name.IDNACodec | None = None,
+ origin: dns.name.Name | None = None,
relativize: bool = True,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> RRset:
"""Create an RRset with the specified name, TTL, class, and type, and with
the specified list of rdatas in text format.
def from_text(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
ttl: int,
- rdclass: Union[dns.rdataclass.RdataClass, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: dns.rdataclass.RdataClass | str,
+ rdtype: dns.rdatatype.RdataType | str,
*text_rdatas: Any,
) -> RRset:
"""Create an RRset with the specified name, TTL, class, and type and with
def from_rdata_list(
- name: Union[dns.name.Name, str],
+ name: dns.name.Name | str,
ttl: int,
rdatas: Collection[dns.rdata.Rdata],
- idna_codec: Optional[dns.name.IDNACodec] = None,
+ idna_codec: dns.name.IDNACodec | None = None,
) -> RRset:
"""Create an RRset with the specified name and TTL, and with
the specified list of rdata objects.
return r
-def from_rdata(name: Union[dns.name.Name, str], ttl: int, *rdatas: Any) -> RRset:
+def from_rdata(name: dns.name.Name | str, ttl: int, *rdatas: Any) -> RRset:
"""Create an RRset with the specified name and TTL, and with
the specified rdata objects.
import io
import sys
-from typing import Any, List, Optional, Tuple
+from typing import Any, List, Tuple
import dns.exception
import dns.name
ttype: int,
value: Any = "",
has_escape: bool = False,
- comment: Optional[str] = None,
+ comment: str | None = None,
):
"""Initialize a token instance."""
def __init__(
self,
f: Any = sys.stdin,
- filename: Optional[str] = None,
- idna_codec: Optional[dns.name.IDNACodec] = None,
+ filename: str | None = None,
+ idna_codec: dns.name.IDNACodec | None = None,
):
"""Initialize a tokenizer instance.
else:
filename = "<file>"
self.file = f
- self.ungotten_char: Optional[str] = None
- self.ungotten_token: Optional[Token] = None
+ self.ungotten_char: str | None = None
+ self.ungotten_token: Token | None = None
self.multiline = 0
self.quoting = False
self.eof = False
)
return value
- def get_string(self, max_length: Optional[int] = None) -> str:
+ def get_string(self, max_length: int | None = None) -> str:
"""Read the next token and interpret it as a string.
Raises dns.exception.SyntaxError if not a string.
raise dns.exception.SyntaxError("expecting an identifier")
return token.value
- def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]:
+ def get_remaining(self, max_tokens: int | None = None) -> List[Token]:
"""Return the remaining tokens on the line, until an EOL or EOF is seen.
max_tokens: If not None, stop after this number of tokens.
def as_name(
self,
token: Token,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = False,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> dns.name.Name:
"""Try to interpret the token as a DNS name.
def get_name(
self,
- origin: Optional[dns.name.Name] = None,
+ origin: dns.name.Name | None = None,
relativize: bool = False,
- relativize_to: Optional[dns.name.Name] = None,
+ relativize_to: dns.name.Name | None = None,
) -> dns.name.Name:
"""Read the next token and interpret it as a DNS name.
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import collections
-from typing import Any, Callable, Iterator, List, Optional, Tuple, Union
+from typing import Any, Callable, Iterator, List, Tuple
import dns.exception
import dns.name
def origin_information(
self,
- ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]:
+ ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]:
"""Returns a tuple
(absolute_origin, relativize, effective_origin)
"""The class of the transaction manager."""
raise NotImplementedError # pragma: no cover
- def from_wire_origin(self) -> Optional[dns.name.Name]:
+ def from_wire_origin(self) -> dns.name.Name | None:
"""Origin to use in from_wire() calls."""
(absolute_origin, relativize, _) = self.origin_information()
if relativize:
def get(
self,
- name: Optional[Union[dns.name.Name, str]],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str | None,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> dns.rdataset.Rdataset:
"""Return the rdataset associated with *name*, *rdtype*, and *covers*,
or `None` if not found.
rdataset = self._get_rdataset(name, rdtype, covers)
return _ensure_immutable_rdataset(rdataset)
- def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]:
+ def get_node(self, name: dns.name.Name) -> dns.node.Node | None:
"""Return the node at *name*, if any.
Returns an immutable node or ``None``.
self._check_read_only()
self._delete(True, args)
- def name_exists(self, name: Union[dns.name.Name, str]) -> bool:
+ def name_exists(self, name: dns.name.Name | str) -> bool:
"""Does the specified name exist?"""
self._check_ended()
if isinstance(name, str):
import hashlib
import hmac
import struct
-from typing import Union
import dns.exception
import dns.name
class Key:
def __init__(
self,
- name: Union[dns.name.Name, str],
- secret: Union[bytes, str],
- algorithm: Union[dns.name.Name, str] = default_algorithm,
+ name: dns.name.Name | str,
+ secret: bytes | str,
+ algorithm: dns.name.Name | str = default_algorithm,
):
if isinstance(name, str):
name = dns.name.from_text(name)
"""DNS TTL conversion."""
-from typing import Union
-
import dns.exception
# Technically TTLs are supposed to be between 0 and 2**31 - 1, with values
return total
-def make(value: Union[int, str]) -> int:
+def make(value: int | str) -> int:
if isinstance(value, int):
return value
elif isinstance(value, str):
"""DNS Dynamic Update Support"""
-from typing import Any, List, Optional, Union
+from typing import Any, List
import dns.enum
import dns.exception
def __init__(
self,
- zone: Optional[Union[dns.name.Name, str]] = None,
+ zone: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
- keyring: Optional[Any] = None,
- keyname: Optional[dns.name.Name] = None,
- keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
- id: Optional[int] = None,
+ keyring: Any | None = None,
+ keyname: dns.name.Name | None = None,
+ keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm,
+ id: int | None = None,
):
"""Initialize a new DNS Update object.
rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, self.origin)
self._add_rr(name, ttl, rd, section=section)
- def add(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+ def add(self, name: dns.name.Name | str, *args: Any) -> None:
"""Add records.
The first argument is always a name. The other
self._add(False, self.update, name, *args)
- def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+ def delete(self, name: dns.name.Name | str, *args: Any) -> None:
"""Delete records.
The first argument is always a name. The other
)
self._add_rr(name, 0, rd, dns.rdataclass.NONE)
- def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+ def replace(self, name: dns.name.Name | str, *args: Any) -> None:
"""Replace records.
The first argument is always a name. The other
self._add(True, self.update, name, *args)
- def present(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+ def present(self, name: dns.name.Name | str, *args: Any) -> None:
"""Require that an owner name (and optionally an rdata type,
or specific rdataset) exists as a prerequisite to the
execution of the update.
def absent(
self,
- name: Union[dns.name.Name, str],
- rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str | None = None,
) -> None:
"""Require that an owner name (and optionally an rdata type) does
not exist as a prerequisite to the execution of the update."""
import collections
import threading
-from typing import Callable, Deque, Optional, Set, Union, cast
+from typing import Callable, Deque, Set, cast
import dns.exception
-import dns.immutable
import dns.name
import dns.node
import dns.rdataclass
def __init__(
self,
- origin: Optional[Union[dns.name.Name, str]],
+ origin: dns.name.Name | str | None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
- pruning_policy: Optional[Callable[["Zone", Version], Optional[bool]]] = None,
+ pruning_policy: Callable[["Zone", Version], bool | None] | None = None,
):
"""Initialize a versioned zone object.
self._pruning_policy = self._default_pruning_policy
else:
self._pruning_policy = pruning_policy
- self._write_txn: Optional[Transaction] = None
- self._write_event: Optional[threading.Event] = None
+ self._write_txn: Transaction | None = None
+ self._write_event: threading.Event | None = None
self._write_waiters: Deque[threading.Event] = collections.deque()
self._readers: Set[Transaction] = set()
self._commit_version_unlocked(
)
def reader(
- self, id: Optional[int] = None, serial: Optional[int] = None
+ self, id: int | None = None, serial: int | None = None
) -> Transaction: # pylint: disable=arguments-differ
if id is not None and serial is not None:
raise ValueError("cannot specify both id and serial")
):
self._versions.popleft()
- def set_max_versions(self, max_versions: Optional[int]) -> None:
+ def set_max_versions(self, max_versions: int | None) -> None:
"""Set a pruning policy that retains up to the specified number
of versions
"""
self.set_pruning_policy(policy)
def set_pruning_policy(
- self, policy: Optional[Callable[["Zone", Version], Optional[bool]]]
+ self, policy: Callable[["Zone", Version], bool | None] | None
) -> None:
"""Set the pruning policy for the zone.
return id
def find_node(
- self, name: Union[dns.name.Name, str], create: bool = False
+ self, name: dns.name.Name | str, create: bool = False
) -> dns.node.Node:
if create:
raise UseTransaction
return super().find_node(name)
- def delete_node(self, name: Union[dns.name.Name, str]) -> None:
+ def delete_node(self, name: dns.name.Name | str) -> None:
raise UseTransaction
def find_rdataset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset:
if create:
def get_rdataset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
if create:
raise UseTransaction
rdataset = super().get_rdataset(name, rdtype, covers)
def delete_rdataset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> None:
raise UseTransaction
def replace_rdataset(
- self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset
+ self, name: dns.name.Name | str, replacement: dns.rdataset.Rdataset
) -> None:
raise UseTransaction
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-from typing import Any, List, Optional, Tuple, Union, cast
+from typing import Any, List, Tuple, cast
import dns.edns
import dns.exception
self,
txn_manager: dns.transaction.TransactionManager,
rdtype: dns.rdatatype.RdataType = dns.rdatatype.AXFR,
- serial: Optional[int] = None,
+ serial: int | None = None,
is_udp: bool = False,
):
"""Initialize an inbound zone transfer.
XFR.
"""
self.txn_manager = txn_manager
- self.txn: Optional[dns.transaction.Transaction] = None
+ self.txn: dns.transaction.Transaction | None = None
self.rdtype = rdtype
if rdtype == dns.rdatatype.IXFR:
if serial is None:
self.serial = serial
self.is_udp = is_udp
(_, _, self.origin) = txn_manager.origin_information()
- self.soa_rdataset: Optional[dns.rdataset.Rdataset] = None
+ self.soa_rdataset: dns.rdataset.Rdataset | None = None
self.done = False
self.expecting_SOA = False
self.delete_mode = False
def make_query(
txn_manager: dns.transaction.TransactionManager,
- serial: Optional[int] = 0,
- use_edns: Optional[Union[int, bool]] = None,
- ednsflags: Optional[int] = None,
- payload: Optional[int] = None,
- request_payload: Optional[int] = None,
- options: Optional[List[dns.edns.Option]] = None,
+ serial: int | None = 0,
+ use_edns: int | bool | None = None,
+ ednsflags: int | None = None,
+ payload: int | None = None,
+ request_payload: int | None = None,
+ options: List[dns.edns.Option] | None = None,
keyring: Any = None,
- keyname: Optional[dns.name.Name] = None,
- keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
-) -> Tuple[dns.message.QueryMessage, Optional[int]]:
+ keyname: dns.name.Name | None = None,
+ keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm,
+) -> Tuple[dns.message.QueryMessage, int | None]:
"""Make an AXFR or IXFR query.
*txn_manager* is a ``dns.transaction.TransactionManager``, typically a
return (q, serial)
-def extract_serial_from_query(query: dns.message.Message) -> Optional[int]:
+def extract_serial_from_query(query: dns.message.Message) -> int | None:
"""Extract the SOA serial number from query if it is an IXFR and return
it, otherwise return None.
Iterator,
List,
MutableMapping,
- Optional,
Set,
Tuple,
- Union,
cast,
)
def _validate_name(
name: dns.name.Name,
- origin: Optional[dns.name.Name],
+ origin: dns.name.Name | None,
relativize: bool,
) -> dns.name.Name:
# This name validation code is shared by Zone and Version
map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = dict
# We only require the version types as "Version" to allow for flexibility, as
# only the version protocol matters
- writable_version_factory: Optional[Callable[["Zone", bool], "Version"]] = None
- immutable_version_factory: Optional[Callable[["Version"], "Version"]] = None
+ writable_version_factory: Callable[["Zone", bool], "Version"] | None = None
+ immutable_version_factory: Callable[["Version"], "Version"] | None = None
__slots__ = ["rdclass", "origin", "nodes", "relativize"]
def __init__(
self,
- origin: Optional[Union[dns.name.Name, str]],
+ origin: dns.name.Name | str | None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
):
return not self.__eq__(other)
- def _validate_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
+ def _validate_name(self, name: dns.name.Name | str) -> dns.name.Name:
# Note that any changes in this method should have corresponding changes
# made in the Version _validate_name() method.
if isinstance(name, str):
return key in self.nodes
def find_node(
- self, name: Union[dns.name.Name, str], create: bool = False
+ self, name: dns.name.Name | str, create: bool = False
) -> dns.node.Node:
"""Find a node in the zone, possibly creating it.
return node
def get_node(
- self, name: Union[dns.name.Name, str], create: bool = False
- ) -> Optional[dns.node.Node]:
+ self, name: dns.name.Name | str, create: bool = False
+ ) -> dns.node.Node | None:
"""Get a node in the zone, possibly creating it.
This method is like ``find_node()``, except it returns None instead
node = None
return node
- def delete_node(self, name: Union[dns.name.Name, str]) -> None:
+ def delete_node(self, name: dns.name.Name | str) -> None:
"""Delete the specified node if it exists.
*name*: the name of the node to find.
def find_rdataset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
create: bool = False,
) -> dns.rdataset.Rdataset:
"""Look for an rdataset with the specified name and type in the zone,
def get_rdataset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
"""Look for an rdataset with the specified name and type in the zone.
This method is like ``find_rdataset()``, except it returns None instead
def delete_rdataset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> None:
"""Delete the rdataset matching *rdtype* and *covers*, if it
exists at the node specified by *name*.
self.delete_node(name)
def replace_rdataset(
- self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset
+ self, name: dns.name.Name | str, replacement: dns.rdataset.Rdataset
) -> None:
"""Replace an rdataset at name.
def find_rrset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> dns.rrset.RRset:
"""Look for an rdataset with the specified name and type in the zone,
and return an RRset encapsulating it.
def get_rrset(
self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- ) -> Optional[dns.rrset.RRset]:
+ name: dns.name.Name | str,
+ rdtype: dns.rdatatype.RdataType | str,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
+ ) -> dns.rrset.RRset | None:
"""Look for an rdataset with the specified name and type in the zone,
and return an RRset encapsulating it.
def iterate_rdatasets(
self,
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
"""Return a generator which yields (name, rdataset) tuples for
all rdatasets in the zone which have the specified *rdtype*
def iterate_rdatas(
self,
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY,
+ covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
"""Return a generator which yields (name, ttl, rdata) tuples for
all rdatas in the zone which have the specified *rdtype*
f: Any,
sorted: bool = True,
relativize: bool = True,
- nl: Optional[str] = None,
+ nl: str | None = None,
want_comments: bool = False,
want_origin: bool = False,
) -> None:
self,
sorted: bool = True,
relativize: bool = True,
- nl: Optional[str] = None,
+ nl: str | None = None,
want_comments: bool = False,
want_origin: bool = False,
) -> str:
raise NoNS
def get_soa(
- self, txn: Optional[dns.transaction.Transaction] = None
+ self, txn: dns.transaction.Transaction | None = None
) -> dns.rdtypes.ANY.SOA.SOA:
"""Get the zone SOA rdata.
# an SOA if there is no origin.
raise NoSOA
origin_name = self.origin
- soa_rds: Optional[dns.rdataset.Rdataset]
+ soa_rds: dns.rdataset.Rdataset | None
if txn:
soa_rds = txn.get(origin_name, dns.rdatatype.SOA)
else:
)
def verify_digest(
- self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD] = None
+ self, zonemd: dns.rdtypes.ANY.ZONEMD.ZONEMD | None = None
) -> None:
- digests: Union[dns.rdataset.Rdataset, List[dns.rdtypes.ANY.ZONEMD.ZONEMD]]
+ digests: dns.rdataset.Rdataset | List[dns.rdtypes.ANY.ZONEMD.ZONEMD]
if zonemd:
digests = [zonemd]
else:
def origin_information(
self,
- ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]:
- effective: Optional[dns.name.Name]
+ ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]:
+ effective: dns.name.Name | None
if self.relativize:
effective = dns.name.empty
else:
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
self,
zone: Zone,
id: int,
- nodes: Optional[MutableMapping[dns.name.Name, dns.node.Node]] = None,
- origin: Optional[dns.name.Name] = None,
+ nodes: MutableMapping[dns.name.Name, dns.node.Node] | None = None,
+ origin: dns.name.Name | None = None,
):
self.zone = zone
self.id = id
def _validate_name(self, name: dns.name.Name) -> dns.name.Name:
return _validate_name(name, self.origin, self.zone.relativize)
- def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]:
+ def get_node(self, name: dns.name.Name) -> dns.node.Node | None:
name = self._validate_name(name)
return self.nodes.get(name)
name: dns.name.Name,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType,
- ) -> Optional[dns.rdataset.Rdataset]:
+ ) -> dns.rdataset.Rdataset | None:
node = self.get_node(name)
if node is None:
return None
def _from_text(
text: Any,
- origin: Optional[Union[dns.name.Name, str]] = None,
+ origin: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
zone_factory: Any = Zone,
- filename: Optional[str] = None,
+ filename: str | None = None,
allow_include: bool = False,
check_origin: bool = True,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- allow_directives: Union[bool, Iterable[str]] = True,
+ idna_codec: dns.name.IDNACodec | None = None,
+ allow_directives: bool | Iterable[str] = True,
) -> Zone:
# See the comments for the public APIs from_text() and from_file() for
# details.
def from_text(
text: str,
- origin: Optional[Union[dns.name.Name, str]] = None,
+ origin: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
zone_factory: Any = Zone,
- filename: Optional[str] = None,
+ filename: str | None = None,
allow_include: bool = False,
check_origin: bool = True,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- allow_directives: Union[bool, Iterable[str]] = True,
+ idna_codec: dns.name.IDNACodec | None = None,
+ allow_directives: bool | Iterable[str] = True,
) -> Zone:
"""Build a zone object from a zone file format string.
def from_file(
f: Any,
- origin: Optional[Union[dns.name.Name, str]] = None,
+ origin: dns.name.Name | str | None = None,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
relativize: bool = True,
zone_factory: Any = Zone,
- filename: Optional[str] = None,
+ filename: str | None = None,
allow_include: bool = True,
check_origin: bool = True,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- allow_directives: Union[bool, Iterable[str]] = True,
+ idna_codec: dns.name.IDNACodec | None = None,
+ allow_directives: bool | Iterable[str] = True,
) -> Zone:
"""Read a zone file and build a zone object.
import re
import sys
-from typing import Any, Iterable, List, Optional, Set, Tuple, Union, cast
+from typing import Any, Iterable, List, Set, Tuple, cast
import dns.exception
import dns.grange
SavedStateType = Tuple[
dns.tokenizer.Tokenizer,
- Optional[dns.name.Name], # current_origin
- Optional[dns.name.Name], # last_name
- Optional[Any], # current_file
+ dns.name.Name | None, # current_origin
+ dns.name.Name | None, # last_name
+ Any | None, # current_file
int, # last_ttl
bool, # last_ttl_known
int, # default_ttl
rdclass: dns.rdataclass.RdataClass,
txn: dns.transaction.Transaction,
allow_include: bool = False,
- allow_directives: Union[bool, Iterable[str]] = True,
- force_name: Optional[dns.name.Name] = None,
- force_ttl: Optional[int] = None,
- force_rdclass: Optional[dns.rdataclass.RdataClass] = None,
- force_rdtype: Optional[dns.rdatatype.RdataType] = None,
- default_ttl: Optional[int] = None,
+ allow_directives: bool | Iterable[str] = True,
+ force_name: dns.name.Name | None = None,
+ force_ttl: int | None = None,
+ force_rdclass: dns.rdataclass.RdataClass | None = None,
+ force_rdtype: dns.rdatatype.RdataType | None = None,
+ default_ttl: int | None = None,
):
self.tok = tok
(self.zone_origin, self.relativize, _) = txn.manager.origin_information()
self.zone_rdclass = rdclass
self.txn = txn
self.saved_state: List[SavedStateType] = []
- self.current_file: Optional[Any] = None
+ self.current_file: Any | None = None
self.allowed_directives: Set[str]
if allow_directives is True:
self.allowed_directives = {"$GENERATE", "$ORIGIN", "$TTL"}
token = self.tok.get()
filename = token.value
token = self.tok.get()
- new_origin: Optional[dns.name.Name]
+ new_origin: dns.name.Name | None
if token.is_identifier():
new_origin = dns.name.from_text(
token.value, self.current_origin, self.tok.idna_codec
class RRSetsReaderManager(dns.transaction.TransactionManager):
def __init__(
self,
- origin: Optional[dns.name.Name] = dns.name.root,
+ origin: dns.name.Name | None = dns.name.root,
relativize: bool = False,
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
):
def read_rrsets(
text: Any,
- name: Optional[Union[dns.name.Name, str]] = None,
- ttl: Optional[int] = None,
- rdclass: Optional[Union[dns.rdataclass.RdataClass, str]] = dns.rdataclass.IN,
- default_rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None,
- default_ttl: Optional[Union[int, str]] = None,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- origin: Optional[Union[dns.name.Name, str]] = dns.name.root,
+ name: dns.name.Name | str | None = None,
+ ttl: int | None = None,
+ rdclass: dns.rdataclass.RdataClass | str | None = dns.rdataclass.IN,
+ default_rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
+ rdtype: dns.rdatatype.RdataType | str | None = None,
+ default_ttl: int | str | None = None,
+ idna_codec: dns.name.IDNACodec | None = None,
+ origin: dns.name.Name | str | None = dns.name.root,
relativize: bool = False,
) -> List[dns.rrset.RRset]:
"""Read one or more rrsets from the specified text, possibly subject
lint.ignore = [
"E501",
"E741",
- "F401",
"I001",
"B904",
"B011",
"UP006",
"UP035",
- # these three are about post-3.9 type syntax and should be revisited after 2.8
- "UP045",
- "UP007",
- "UP038",
]
lint.exclude = ["tests/*"]