import importlib.metadata
import itertools
import string
-from typing import Dict, List, Tuple
-def _tuple_from_text(version: str) -> Tuple:
+def _tuple_from_text(version: str) -> tuple:
text_parts = version.split(".")
int_parts = []
for text_part in text_parts:
return True
-_cache: Dict[str, bool] = {}
+_cache: dict[str, bool] = {}
def have(feature: str) -> bool:
_cache[feature] = enabled
-_requirements: Dict[str, List[str]] = {
+_requirements: dict[str, list[str]] = {
### BEGIN generated requirements
"dnssec": ["cryptography>=45"],
"doh": ["httpcore>=1.0.0", "httpx>=0.28.0", "h2>=4.2.0"],
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import os
-from typing import Tuple
def convert_verify_to_cafile_and_capath(
verify: bool | str,
-) -> Tuple[str | None, str | None]:
+) -> tuple[str | None, str | None]:
cafile: str | None = None
capath: str | None = None
if isinstance(verify, str):
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-from typing import Dict
-
import dns.exception
# pylint: disable=unused-import
_default_backend = None
-_backends: Dict[str, Backend] = {}
+_backends: dict[str, Backend] = {}
# Allow sniffio import to be disabled for testing purposes
_no_sniffio = False
import struct
import time
import urllib.parse
-from typing import Any, Dict, Optional, Tuple, cast
+from typing import Any, cast
import dns.asyncbackend
import dns.exception
what: dns.message.Message | bytes,
destination: Any,
expiration: float | None = None,
-) -> Tuple[int, float]:
+) -> tuple[int, float]:
"""Send a DNS message to the specified UDP socket.
*sock*, a ``dns.asyncbackend.DatagramSocket``.
expiration: float | None = None,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
- keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ keyring: dict[dns.name.Name, dns.tsig.Key] | None = None,
request_mac: bytes | None = b"",
ignore_trailing: bool = False,
raise_on_truncation: bool = False,
tcp_sock: dns.asyncbackend.StreamSocket | None = None,
backend: dns.asyncbackend.Backend | None = None,
ignore_errors: bool = False,
-) -> Tuple[dns.message.Message, bool]:
+) -> tuple[dns.message.Message, bool]:
"""Return the response to the query, trying UDP first and falling back
to TCP if UDP results in a truncated response.
sock: dns.asyncbackend.StreamSocket,
what: dns.message.Message | bytes,
expiration: float | None = None,
-) -> Tuple[int, float]:
+) -> tuple[int, float]:
"""Send a DNS message to the specified TCP socket.
*sock*, a ``dns.asyncbackend.StreamSocket``.
sock: dns.asyncbackend.StreamSocket,
expiration: float | None = None,
one_rr_per_rrset: bool = False,
- keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ keyring: dict[dns.name.Name, dns.tsig.Key] | None = None,
request_mac: bytes | None = b"",
ignore_trailing: bool = False,
-) -> Tuple[dns.message.Message, float]:
+) -> tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
*sock*, a ``dns.asyncbackend.StreamSocket``.
def _maybe_get_resolver(
- resolver: Optional["dns.asyncresolver.Resolver"], # type: ignore
+ resolver: "dns.asyncresolver.Resolver | None", # type: ignore
) -> "dns.asyncresolver.Resolver": # type: ignore
# We need a separate method for this to avoid overriding the global
# variable "dns" with the as-yet undefined local variable "dns"
source_port: int = 0, # pylint: disable=W0613
one_rr_per_rrset: bool = False,
ignore_trailing: bool = False,
- client: Optional["httpx.AsyncClient|dns.quic.AsyncQuicConnection"] = None,
+ client: "httpx.AsyncClient|dns.quic.AsyncQuicConnection | None" = None,
path: str = "/dns-query",
post: bool = True,
verify: bool | str | ssl.SSLContext = True,
bootstrap_address: str | None = None,
- resolver: Optional["dns.asyncresolver.Resolver"] = None, # type: ignore
+ resolver: "dns.asyncresolver.Resolver | None" = None, # type: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
) -> dns.message.Message:
import socket
import time
-from typing import Any, Dict, List
+from typing import Any
import dns._ddr
import dns.asyncbackend
import dns.asyncquery
-import dns.exception
import dns.inet
import dns.name
import dns.nameserver
# We make a modified kwargs for type checking happiness, as otherwise
# we get a legit warning about possibly having rdtype and rdclass
# in the kwargs more than once.
- modified_kwargs: Dict[str, Any] = {}
+ modified_kwargs: dict[str, Any] = {}
modified_kwargs.update(kwargs)
modified_kwargs["rdtype"] = dns.rdatatype.PTR
modified_kwargs["rdclass"] = dns.rdataclass.IN
# We make a modified kwargs for type checking happiness, as otherwise
# we get a legit warning about possibly having rdtype and rdclass
# in the kwargs more than once.
- modified_kwargs: Dict[str, Any] = {}
+ modified_kwargs: dict[str, Any] = {}
modified_kwargs.update(kwargs)
modified_kwargs.pop("rdtype", None)
modified_kwargs["rdclass"] = dns.rdataclass.IN
"""
if resolver is None:
resolver = get_default_resolver()
- nameservers: List[str | dns.nameserver.Nameserver] = []
+ nameservers: list[str | dns.nameserver.Nameserver] = []
if isinstance(where, str) and dns.inet.is_address(where):
nameservers.append(dns.nameserver.Do53Nameserver(where, port))
else:
insertion.
"""
-from collections.abc import MutableMapping, MutableSet
-from typing import Callable, Generic, Optional, Tuple, TypeVar, cast
+from collections.abc import Callable, MutableMapping, MutableSet
+from typing import Generic, TypeVar, cast
DEFAULT_T = 127
else:
return child
- def _get_node(self, key: KT) -> Tuple[Optional["_Node[KT, ET]"], int]:
+ def _get_node(self, key: KT) -> tuple["_Node[KT, ET] | None", int]:
"""Get the node associated with key and its index, doing
copy-on-write if we have to descend.
left.merge(parent, index - 1)
def delete(
- self, key: KT, parent: Optional["_Node[KT, ET]"], exact: ET | None
+ self, key: KT, parent: "_Node[KT, ET] | None", exact: ET | None
) -> ET | None:
"""Delete an element matching *key* if it exists. If *exact* is not ``None``
then it must be an exact match with that element. The Node must not be
for child in self.children:
child._visit_preorder_by_node(visit)
- def maybe_cow(self, creator: _Creator) -> Optional["_Node[KT, ET]"]:
+ def maybe_cow(self, creator: _Creator) -> "_Node[KT, ET] | None":
"""Return a clone of this Node if it was not created by *creator*, or ``None``
otherwise (i.e. copy for copy-on-write if we haven't already copied it)."""
if self.creator is not creator:
class BTree(Generic[KT, ET]):
"""An in-memory BTree with copy-on-write and cursors."""
- def __init__(self, *, t: int = DEFAULT_T, original: Optional["BTree"] = None):
+ def __init__(self, *, t: int = DEFAULT_T, original: "BTree | None" = None):
"""Create a BTree.
If *original* is not ``None``, then the BTree is shallow-cloned from
# points, and the GLUE flag is set on nodes beneath delegation points.
import enum
+from collections.abc import Callable, MutableMapping
from dataclasses import dataclass
-from typing import Callable, MutableMapping, Tuple, cast
+from typing import cast
import dns.btree
import dns.immutable
class Delegations(dns.btree.BTreeSet[dns.name.Name]):
- def get_delegation(self, name: dns.name.Name) -> Tuple[dns.name.Name | None, bool]:
+ def get_delegation(self, name: dns.name.Name) -> tuple[dns.name.Name | None, bool]:
"""Get the delegation applicable to *name*, if it exists.
If there delegation, then return a tuple consisting of the name of
def _maybe_cow_with_name(
self, name: dns.name.Name
- ) -> Tuple[dns.node.Node, dns.name.Name]:
+ ) -> tuple[dns.node.Node, dns.name.Name]:
(node, name) = super()._maybe_cow_with_name(name)
node = cast(Node, node)
if self._is_origin(name):
import hashlib
import struct
import time
+from collections.abc import Callable
from datetime import datetime
-from typing import Callable, Dict, List, Set, Tuple, Union, cast
+from typing import Union, cast
import dns._features
import dns.name
def _find_candidate_keys(
- keys: Dict[dns.name.Name, dns.rdataset.Rdataset | dns.node.Node], rrsig: RRSIG
-) -> List[DNSKEY] | None:
+ keys: dict[dns.name.Name, dns.rdataset.Rdataset | dns.node.Node], rrsig: RRSIG
+) -> list[DNSKEY] | None:
value = keys.get(rrsig.signer)
if isinstance(value, dns.node.Node):
rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY)
def _get_rrname_rdataset(
- rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
-) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
+ rrset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
+) -> tuple[dns.name.Name, dns.rdataset.Rdataset]:
if isinstance(rrset, tuple):
return rrset[0], rrset[1]
else:
def _validate_rrsig(
- rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
+ rrset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
rrsig: RRSIG,
- keys: Dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset],
+ keys: dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset],
origin: dns.name.Name | None = None,
now: float | None = None,
policy: Policy | None = None,
def _validate(
- rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
- rrsigset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
- keys: Dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset],
+ rrset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
+ rrsigset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
+ keys: dict[dns.name.Name, dns.node.Node | dns.rdataset.Rdataset],
origin: dns.name.Name | None = None,
now: float | None = None,
policy: Policy | None = None,
def _sign(
- rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
+ rrset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
private_key: PrivateKey,
signer: dns.name.Name,
dnskey: DNSKEY,
def _make_rrsig_signature_data(
- rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
+ rrset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
rrsig: RRSIG,
origin: dns.name.Name | None = None,
) -> bytes:
def make_ds_rdataset(
- rrset: dns.rrset.RRset | Tuple[dns.name.Name, dns.rdataset.Rdataset],
- algorithms: Set[DSDigest | str],
+ rrset: dns.rrset.RRset | tuple[dns.name.Name, dns.rdataset.Rdataset],
+ algorithms: set[DSDigest | str],
origin: dns.name.Name | None = None,
) -> dns.rdataset.Rdataset:
"""Create a DS record from DNSKEY/CDNSKEY/CDS.
txn: dns.transaction.Transaction,
rrset: dns.rrset.RRset,
signer: dns.name.Name,
- ksks: List[Tuple[PrivateKey, DNSKEY]],
- zsks: List[Tuple[PrivateKey, DNSKEY]],
+ ksks: list[tuple[PrivateKey, DNSKEY]],
+ zsks: list[tuple[PrivateKey, DNSKEY]],
inception: datetime | str | int | float | None = None,
expiration: datetime | str | int | float | None = None,
lifetime: int | None = None,
def sign_zone(
zone: dns.zone.Zone,
txn: dns.transaction.Transaction | None = None,
- keys: List[Tuple[PrivateKey, DNSKEY]] | None = None,
+ keys: list[tuple[PrivateKey, DNSKEY]] | None = None,
add_dnskey: bool = True,
dnskey_ttl: int | None = None,
inception: datetime | str | int | float | None = None,
-from typing import Dict, Tuple, Type
-
import dns._features
import dns.name
from dns.dnssecalgs.base import GenericPrivateKey
AlgorithmPrefix = bytes | dns.name.Name | None
-algorithms: Dict[Tuple[Algorithm, AlgorithmPrefix], Type[GenericPrivateKey]] = {}
+algorithms: dict[tuple[Algorithm, AlgorithmPrefix], type[GenericPrivateKey]] = {}
if _have_cryptography:
# pylint: disable=possibly-used-before-assignment
algorithms.update(
def get_algorithm_cls(
algorithm: int | str, prefix: AlgorithmPrefix = None
-) -> Type[GenericPrivateKey]:
+) -> type[GenericPrivateKey]:
"""Get Private Key class from Algorithm.
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
)
-def get_algorithm_cls_from_dnskey(dnskey: DNSKEY) -> Type[GenericPrivateKey]:
+def get_algorithm_cls_from_dnskey(dnskey: DNSKEY) -> type[GenericPrivateKey]:
"""Get Private Key class from DNSKEY.
*dnskey*, a ``DNSKEY`` to get Algorithm class for.
def register_algorithm_cls(
algorithm: int | str,
- algorithm_cls: Type[GenericPrivateKey],
+ algorithm_cls: type[GenericPrivateKey],
name: dns.name.Name | str | None = None,
oid: bytes | None = None,
) -> None:
from abc import ABC, abstractmethod # pylint: disable=no-name-in-module
-from typing import Any, Type
+from typing import Any
import dns.rdataclass
import dns.rdatatype
class GenericPrivateKey(ABC):
- public_cls: Type[GenericPublicKey]
+ public_cls: type[GenericPublicKey]
@abstractmethod
def __init__(self, key: Any) -> None:
-from typing import Any, Type
+from typing import Any
from cryptography.hazmat.primitives import serialization
class CryptographyPrivateKey(GenericPrivateKey):
key: Any = None
key_cls: Any = None
- public_cls: Type[CryptographyPublicKey] # type: ignore
+ public_cls: type[CryptographyPublicKey] # type: ignore
def __init__(self, key: Any) -> None: # pylint: disable=super-init-not-called
if self.key_cls is None:
-from typing import Type
-
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed448, ed25519
class PrivateEDDSA(CryptographyPrivateKey):
- public_cls: Type[PublicEDDSA] # type: ignore
+ public_cls: type[PublicEDDSA] # type: ignore
def sign(
self,
"""DNS E.164 helpers."""
-from typing import Iterable
+from collections.abc import Iterable
import dns.exception
import dns.name
import math
import socket
import struct
-from typing import Any, Dict
+from typing import Any
import dns.enum
import dns.inet
return cls(parser.get_name())
-_type_to_class: Dict[OptionType, Any] = {
+_type_to_class: dict[OptionType, Any] = {
OptionType.ECS: ECSOption,
OptionType.EDE: EDEOption,
OptionType.NSID: NSIDOption,
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import enum
-from typing import Any, Type, TypeVar
+from typing import Any, TypeVar
TIntEnum = TypeVar("TIntEnum", bound="IntEnum")
raise ValueError(f"{name} must be an int between >= 0 and <= {max}")
@classmethod
- def from_text(cls: Type[TIntEnum], text: str) -> TIntEnum:
+ def from_text(cls: type[TIntEnum], text: str) -> TIntEnum:
text = text.upper()
try:
return cls[text]
raise cls._unknown_exception_class()
@classmethod
- def to_text(cls: Type[TIntEnum], value: int) -> str:
+ def to_text(cls: type[TIntEnum], value: int) -> str:
cls._check_value(value)
try:
text = cls(value).name
return text
@classmethod
- def make(cls: Type[TIntEnum], value: int | str) -> TIntEnum:
+ def make(cls: type[TIntEnum], value: int | str) -> TIntEnum:
"""Convert text or a value into an enumerated type, if possible.
*value*, the ``int`` or ``str`` to convert.
return current_text
@classmethod
- def _unknown_exception_class(cls) -> Type[Exception]:
+ def _unknown_exception_class(cls) -> type[Exception]:
return ValueError
"""
-from typing import Set
-
-
class DNSException(Exception):
"""Abstract base class shared by all dnspython exceptions.
"""
msg: str | None = None # non-parametrized message
- supp_kwargs: Set[str] = set() # accepted parameters for _fmt_kwargs (sanity check)
+ supp_kwargs: set[str] = set() # accepted parameters for _fmt_kwargs (sanity check)
fmt: str | None = None # message parametrized with results from _fmt_kwargs
def __init__(self, *args, **kwargs):
"""DNS GENERATE range conversion."""
-from typing import Tuple
-
import dns.exception
-def from_text(text: str) -> Tuple[int, int, int]:
+def from_text(text: str) -> tuple[int, int, int]:
"""Convert the text form of a range in a ``$GENERATE`` statement to an
integer.
"""Generic Internet address helper functions."""
import socket
-from typing import Any, Tuple
+from typing import Any
import dns.ipv4
import dns.ipv6
return False
-def low_level_address_tuple(high_tuple: Tuple[str, int], af: int | None = None) -> Any:
+def low_level_address_tuple(high_tuple: tuple[str, int], af: int | None = None) -> Any:
"""Given a "high-level" address tuple, i.e.
an (address, port) return the appropriate "low-level" address tuple
suitable for use in socket calls.
import binascii
import re
-from typing import List
import dns.exception
import dns.ipv4
if l > 8:
raise dns.exception.SyntaxError
seen_empty = False
- canonical: List[bytes] = []
+ canonical: list[bytes] = []
for c in chunks:
if c == b"":
if seen_empty:
import enum
import io
import time
-from typing import Any, Dict, List, Tuple, cast
+from typing import Any, cast
import dns.edns
import dns.entropy
DEFAULT_EDNS_PAYLOAD = 1232
MAX_CHAIN = 16
-IndexKeyType = Tuple[
+IndexKeyType = tuple[
int,
dns.name.Name,
dns.rdataclass.RdataClass,
dns.rdatatype.RdataType | None,
dns.rdataclass.RdataClass | None,
]
-IndexType = Dict[IndexKeyType, dns.rrset.RRset]
-SectionType = int | str | List[dns.rrset.RRset]
+IndexType = dict[IndexKeyType, dns.rrset.RRset]
+SectionType = int | str | list[dns.rrset.RRset]
class Message:
else:
self.id = id
self.flags = 0
- self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
+ self.sections: list[list[dns.rrset.RRset]] = [[], [], [], []]
self.opt: dns.rrset.RRset | None = None
self.request_payload = 0
self.pad = 0
self.origin: dns.name.Name | None = None
self.tsig_ctx: Any | None = None
self.index: IndexType = {}
- self.errors: List[MessageError] = []
+ self.errors: list[MessageError] = []
self.time = 0.0
self.wire: bytes | None = None
@property
- def question(self) -> List[dns.rrset.RRset]:
+ def question(self) -> list[dns.rrset.RRset]:
"""The question section."""
return self.sections[0]
self.sections[0] = v
@property
- def answer(self) -> List[dns.rrset.RRset]:
+ def answer(self) -> list[dns.rrset.RRset]:
"""The answer section."""
return self.sections[1]
self.sections[1] = v
@property
- def authority(self) -> List[dns.rrset.RRset]:
+ def authority(self) -> list[dns.rrset.RRset]:
"""The authority section."""
return self.sections[2]
self.sections[2] = v
@property
- def additional(self) -> List[dns.rrset.RRset]:
+ def additional(self) -> list[dns.rrset.RRset]:
"""The additional data section."""
return self.sections[3]
self,
origin: dns.name.Name | None = None,
relativize: bool = True,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> str:
"""Convert the message to text.
return False
return True
- def section_number(self, section: List[dns.rrset.RRset]) -> int:
+ def section_number(self, section: list[dns.rrset.RRset]) -> int:
"""Return the "section number" of the specified section for use
in indexing.
return self._section_enum(i)
raise ValueError("unknown section")
- def section_from_number(self, number: int) -> List[dns.rrset.RRset]:
+ def section_from_number(self, number: int) -> list[dns.rrset.RRset]:
"""Return the section list associated with the specified section
number.
tsig_ctx: Any | None = None,
prepend_length: bool = False,
prefer_truncation: bool = False,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> bytes:
"""Return a string containing the message in DNS compressed wire
format.
ednsflags: int = 0,
payload: int = DEFAULT_EDNS_PAYLOAD,
request_payload: int | None = None,
- options: List[dns.edns.Option] | None = None,
+ options: list[dns.edns.Option] | None = None,
pad: int = 0,
) -> None:
"""Configure EDNS behavior.
return 0
@property
- def options(self) -> Tuple:
+ def options(self) -> tuple:
if self.opt:
rdata = cast(dns.rdtypes.ANY.OPT.OPT, self.opt[0])
return rdata.options
self.flags &= 0x87FF
self.flags |= dns.opcode.to_flags(opcode)
- def get_options(self, otype: dns.edns.OptionType) -> List[dns.edns.Option]:
+ def get_options(self, otype: dns.edns.OptionType) -> list[dns.edns.Option]:
"""Return the list of options of the specified type."""
return [option for option in self.options if option.otype == otype]
- def extended_errors(self) -> List[dns.edns.EDEOption]:
+ def extended_errors(self) -> list[dns.edns.EDEOption]:
"""Return the list of Extended DNS Error (EDE) options in the message"""
- return cast(List[dns.edns.EDEOption], self.get_options(dns.edns.OptionType.EDE))
+ return cast(list[dns.edns.EDEOption], self.get_options(dns.edns.OptionType.EDE))
def _get_one_rr_per_rrset(self, value):
# What the caller picked is fine.
canonical_name: dns.name.Name,
answer: dns.rrset.RRset | None,
minimum_ttl: int,
- cnames: List[dns.rrset.RRset],
+ cnames: list[dns.rrset.RRset],
):
self.canonical_name = canonical_name
self.answer = answer
ednsflags: int | None = None,
payload: int | None = None,
request_payload: int | None = None,
- options: List[dns.edns.Option] | None = None,
+ options: list[dns.edns.Option] | None = None,
idna_codec: dns.name.IDNACodec | None = None,
id: int | None = None,
flags: int = dns.flags.RD,
# only pass keywords on to use_edns if they have been set to a
# non-None value. Setting a field will turn EDNS on if it hasn't
# been configured.
- kwargs: Dict[str, Any] = {}
+ kwargs: dict[str, Any] = {}
if ednsflags is not None:
kwargs["ednsflags"] = ednsflags
if payload is not None:
import encodings.idna # type: ignore
import functools
import struct
-from typing import Any, Callable, Dict, Iterable, Optional, Tuple
+from collections.abc import Callable, Iterable
+from typing import Any
import dns._features
import dns.enum
have_idna_2008 = False
-CompressType = Dict["Name", int]
+CompressType = dict["Name", int]
class NameRelation(dns.enum.IntEnum):
IDNA_2008 = IDNA_2008_Practical
-def _validate_labels(labels: Tuple[bytes, ...]) -> None:
+def _validate_labels(labels: tuple[bytes, ...]) -> None:
"""Check for empty labels in the middle of a label sequence,
labels that are too long, and for too many labels.
h += (h << 3) + c
return h
- def fullcompare(self, other: "Name") -> Tuple[NameRelation, int, int]:
+ def fullcompare(self, other: "Name") -> tuple[NameRelation, int, int]:
"""Compare two names, returning a 3-tuple
``(relation, order, nlabels)``.
idna_codec = IDNA_2003_Practical
return ".".join([idna_codec.decode(x) for x in l])
- def to_digestable(self, origin: Optional["Name"] = None) -> bytes:
+ def to_digestable(self, origin: "Name | None" = None) -> bytes:
"""Convert name to a format suitable for digesting in hashes.
The name is canonicalized and converted to uncompressed wire
self,
file: Any | None = None,
compress: CompressType | None = None,
- origin: Optional["Name"] = None,
+ origin: "Name | None" = None,
canonicalize: bool = False,
) -> bytes | None:
"""Convert name to wire format, possibly compressing it.
def __sub__(self, other):
return self.relativize(other)
- def split(self, depth: int) -> Tuple["Name", "Name"]:
+ def split(self, depth: int) -> tuple["Name", "Name"]:
"""Split a name into a prefix and suffix names at the specified depth.
*depth* is an ``int`` specifying the number of labels in the suffix
return self
def choose_relativity(
- self, origin: Optional["Name"] = None, relativize: bool = True
+ self, origin: "Name | None" = None, relativize: bool = True
) -> "Name":
"""Return a name with the relativity desired by the caller.
return Name(labels)
-def from_wire(message: bytes, current: int) -> Tuple[Name, int]:
+def from_wire(message: bytes, current: int) -> tuple[Name, int]:
"""Convert possibly compressed wire format into a Name.
*message* is a ``bytes`` containing an entire DNS message in DNS
import enum
import io
-from typing import Any, Dict
+from typing import Any
import dns.immutable
import dns.name
# the set of rdatasets, represented as a list.
self.rdatasets = []
- def to_text(self, name: dns.name.Name, **kw: Dict[str, Any]) -> str:
+ def to_text(self, name: dns.name.Name, **kw: dict[str, Any]) -> str:
"""Convert a node to text format.
Each rdataset at the node is printed. Any keyword arguments
"""DNS Opcodes."""
-from typing import Type
-
import dns.enum
import dns.exception
return 15
@classmethod
- def _unknown_exception_class(cls) -> Type[Exception]:
+ def _unknown_exception_class(cls) -> type[Exception]:
return UnknownOpcode
import struct
import time
import urllib.parse
-from typing import Any, Callable, Dict, Optional, Tuple, cast
+from collections.abc import Callable
+from typing import Any, cast
import dns._features
import dns._tls_util
def _maybe_get_resolver(
- resolver: Optional["dns.resolver.Resolver"], # type: ignore
+ resolver: "dns.resolver.Resolver | None", # type: ignore
) -> "dns.resolver.Resolver": # type: ignore
# We need a separate method for this to avoid overriding the global
# variable "dns" with the as-yet undefined local variable "dns"
post: bool = True,
bootstrap_address: str | None = None,
verify: bool | str | ssl.SSLContext = True,
- resolver: Optional["dns.resolver.Resolver"] = None, # type: ignore
+ resolver: "dns.resolver.Resolver | None" = None, # type: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
) -> dns.message.Message:
what: dns.message.Message | bytes,
destination: Any,
expiration: float | None = None,
-) -> Tuple[int, float]:
+) -> tuple[int, float]:
"""Send a DNS message to the specified UDP socket.
*sock*, a ``socket``.
expiration: float | None = None,
ignore_unexpected: bool = False,
one_rr_per_rrset: bool = False,
- keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ keyring: dict[dns.name.Name, dns.tsig.Key] | None = None,
request_mac: bytes | None = b"",
ignore_trailing: bool = False,
raise_on_truncation: bool = False,
udp_sock: Any | None = None,
tcp_sock: Any | None = None,
ignore_errors: bool = False,
-) -> Tuple[dns.message.Message, bool]:
+) -> tuple[dns.message.Message, bool]:
"""Return the response to the query, trying UDP first and falling back
to TCP if UDP results in a truncated response.
sock: Any,
what: dns.message.Message | bytes,
expiration: float | None = None,
-) -> Tuple[int, float]:
+) -> tuple[int, float]:
"""Send a DNS message to the specified TCP socket.
*sock*, a ``socket``.
sock: Any,
expiration: float | None = None,
one_rr_per_rrset: bool = False,
- keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ keyring: dict[dns.name.Name, dns.tsig.Key] | None = None,
request_mac: bytes | None = b"",
ignore_trailing: bool = False,
-) -> Tuple[dns.message.Message, float]:
+) -> tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
*sock*, a ``socket``.
q.id = 0
wire = q.to_wire()
the_connection: dns.quic.SyncQuicConnection
- the_manager: dns.quic.SyncQuicManager # type: ignore
+ the_manager: dns.quic.SyncQuicManager # type: ignore
if connection:
manager: contextlib.AbstractContextManager = contextlib.nullcontext(None)
the_connection = connection
rdclass: dns.rdataclass.RdataClass | str = dns.rdataclass.IN,
timeout: float | None = None,
port: int = 53,
- keyring: Dict[dns.name.Name, dns.tsig.Key] | None = None,
+ keyring: dict[dns.name.Name, dns.tsig.Key] | None = None,
keyname: dns.name.Name | str | None = None,
relativize: bool = True,
lifetime: float | None = None,
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-from typing import Any, Dict, List, Tuple
+from typing import Any
import dns._features
import dns.asyncbackend
# We have a context factory and a manager factory as for trio we need to have
# a nursery.
- _async_factories: Dict[str, Tuple[Any, Any]] = {
+ _async_factories: dict[str, tuple[Any, Any]] = {
"asyncio": (null_factory, _asyncio_manager_factory)
}
raise NotImplementedError
-Headers = List[Tuple[bytes, bytes]]
+Headers = list[tuple[bytes, bytes]]
import struct
import time
-import aioquic.h3.connection # type: ignore
import aioquic.h3.events # type: ignore
-import aioquic.quic.configuration # type: ignore
-import aioquic.quic.connection # type: ignore
import aioquic.quic.events # type: ignore
import dns.asyncbackend
import threading
import time
-import aioquic.h3.connection # type: ignore
import aioquic.h3.events # type: ignore
-import aioquic.quic.configuration # type: ignore
-import aioquic.quic.connection # type: ignore
import aioquic.quic.events # type: ignore
import dns.exception
import struct
import time
-import aioquic.h3.connection # type: ignore
import aioquic.h3.events # type: ignore
-import aioquic.quic.configuration # type: ignore
-import aioquic.quic.connection # type: ignore
import aioquic.quic.events # type: ignore
import trio
"""DNS Result Codes."""
-from typing import Tuple, Type
-
import dns.enum
import dns.exception
return 4095
@classmethod
- def _unknown_exception_class(cls) -> Type[Exception]:
+ def _unknown_exception_class(cls) -> type[Exception]:
return UnknownRcode
return Rcode.make(value)
-def to_flags(value: Rcode) -> Tuple[int, int]:
+def to_flags(value: Rcode) -> tuple[int, int]:
"""Return a (flags, ednsflags) tuple which encodes the rcode.
*value*, a ``dns.rcode.Rcode``, the rcode.
import itertools
import random
from importlib import import_module
-from typing import Any, Dict, Tuple
+from typing import Any
import dns.exception
import dns.immutable
self,
origin: dns.name.Name | None = None,
relativize: bool = True,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> str:
"""Convert an rdata to text format.
self,
origin: dns.name.Name | None = None,
relativize: bool = True,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> str:
return rf"\# {len(self.data)} " + _hexify(self.data, **kw) # type: ignore
return cls(rdclass, rdtype, parser.get_remaining())
-_rdata_classes: Dict[Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = (
+_rdata_classes: dict[tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = (
{}
)
_module_prefix = "dns.rdtypes"
import io
import random
import struct
-from typing import Any, Collection, Dict, List, cast
+from collections.abc import Collection
+from typing import Any, cast
import dns.exception
import dns.immutable
relativize: bool = True,
override_rdclass: dns.rdataclass.RdataClass | None = None,
want_comments: bool = False,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> str:
"""Convert the rdataset into DNS zone file format.
file.write(struct.pack("!HHIH", self.rdtype, rdclass, 0, 0))
return 1
else:
- l: Rdataset | List[dns.rdata.Rdata]
+ l: Rdataset | list[dns.rdata.Rdata]
if want_shuffle:
l = list(self)
random.shuffle(l)
return True
return False
- def processing_order(self) -> List[dns.rdata.Rdata]:
+ def processing_order(self) -> list[dns.rdata.Rdata]:
"""Return rdatas in a valid processing order according to the type's
specification. For example, MX records are in preference order from
lowest to highest preferences, with items of the same preference
"""DNS Rdata Types."""
-from typing import Dict
-
import dns.enum
import dns.exception
return UnknownRdatatype
-_registered_by_text: Dict[str, RdataType] = {}
-_registered_by_value: Dict[RdataType, str] = {}
+_registered_by_text: dict[str, RdataType] = {}
+_registered_by_value: dict[RdataType, str] = {}
_metatypes = {RdataType.OPT}
import base64
import enum
import struct
-from typing import Any, Dict
+from typing import Any
import dns.enum
import dns.exception
import dns.immutable
import dns.ipv4
import dns.ipv6
-import dns.name
import dns.rdata
import dns.rdtypes.util
import dns.renderer
raise NotImplementedError # pragma: no cover
-_class_for_key: Dict[ParamKey, Any] = {
+_class_for_key: dict[ParamKey, Any] = {
ParamKey.MANDATORY: MandatoryParam,
ParamKey.ALPN: ALPNParam,
ParamKey.NO_DEFAULT_ALPN: NoDefaultALPNParam,
"""TXT-like base class."""
-from typing import Any, Dict, Iterable, Tuple
+from collections.abc import Iterable
+from typing import Any
import dns.exception
import dns.immutable
*strings*, a tuple of ``bytes``
"""
super().__init__(rdclass, rdtype)
- self.strings: Tuple[bytes] = self._as_tuple(
+ self.strings: tuple[bytes] = self._as_tuple(
strings, lambda x: self._as_bytes(x, True, 255)
)
if len(self.strings) == 0:
self,
origin: dns.name.Name | None = None,
relativize: bool = True,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> str:
txt = ""
prefix = ""
import collections
import random
import struct
-from typing import Any, Iterable, List, Tuple
+from collections.abc import Iterable
+from typing import Any
import dns.exception
import dns.ipv4
type_name = ""
- def __init__(self, windows: Iterable[Tuple[int, bytes]] | None = None):
+ def __init__(self, windows: Iterable[tuple[int, bytes]] | None = None):
last_window = -1
if windows is None:
windows = []
return cls.from_rdtypes(rdtypes)
@classmethod
- def from_rdtypes(cls, rdtypes: List[dns.rdatatype.RdataType]) -> "Bitmap":
+ def from_rdtypes(cls, rdtypes: list[dns.rdatatype.RdataType]) -> "Bitmap":
rdtypes = sorted(rdtypes)
window = 0
octets = 0
import threading
import time
import warnings
-from typing import Any, Dict, Iterator, List, Sequence, Tuple, cast
+from collections.abc import Iterator, Sequence
+from typing import Any, cast
from urllib.parse import urlparse
import dns._ddr
"""The DNS query name is too long after DNAME substitution."""
-ErrorTuple = Tuple[
+ErrorTuple = tuple[
str | None,
bool,
int,
]
-def _errors_to_text(errors: List[ErrorTuple]) -> List[str]:
+def _errors_to_text(errors: list[ErrorTuple]) -> list[str]:
"""Turn a resolution errors trace into a list of text."""
texts = []
for err in errors:
# filtering by address family.
def addresses_and_families(
self, family: int = socket.AF_UNSPEC
- ) -> Iterator[Tuple[str, int]]:
+ ) -> Iterator[tuple[str, int]]:
if family == socket.AF_UNSPEC:
yield from self.addresses_and_families(socket.AF_INET6)
yield from self.addresses_and_families(socket.AF_INET)
return self.statistics.clone()
-CacheKey = Tuple[dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass]
+CacheKey = tuple[dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass]
class Cache(CacheBase):
"""
super().__init__()
- self.data: Dict[CacheKey, Answer] = {}
+ self.data: dict[CacheKey, Answer] = {}
self.cleaning_interval = cleaning_interval
self.next_cleaning: float = time.time() + self.cleaning_interval
"""
super().__init__()
- self.data: Dict[CacheKey, LRUCacheNode] = {}
+ self.data: dict[CacheKey, LRUCacheNode] = {}
self.set_max_size(max_size)
self.sentinel: LRUCacheNode = LRUCacheNode(None, None)
self.sentinel.prev = self.sentinel
self.rdclass = rdclass
self.tcp = tcp
self.raise_on_no_answer = raise_on_no_answer
- self.nxdomain_responses: Dict[dns.name.Name, dns.message.QueryMessage] = {}
+ self.nxdomain_responses: dict[dns.name.Name, dns.message.QueryMessage] = {}
# Initialize other things to help analysis tools
self.qname = dns.name.empty
- self.nameservers: List[dns.nameserver.Nameserver] = []
- self.current_nameservers: List[dns.nameserver.Nameserver] = []
- self.errors: List[ErrorTuple] = []
+ self.nameservers: list[dns.nameserver.Nameserver] = []
+ self.current_nameservers: list[dns.nameserver.Nameserver] = []
+ self.errors: list[ErrorTuple] = []
self.nameserver: dns.nameserver.Nameserver | None = None
self.tcp_attempt = False
self.retry_with_tcp = False
def next_request(
self,
- ) -> Tuple[dns.message.QueryMessage | None, Answer | None]:
+ ) -> tuple[dns.message.QueryMessage | None, Answer | None]:
"""Get the next request to send, and check the cache.
Returns a (request, answer) tuple. At most one of request or
#
raise NXDOMAIN(qnames=self.qnames_to_try, responses=self.nxdomain_responses)
- def next_nameserver(self) -> Tuple[dns.nameserver.Nameserver, bool, float]:
+ def next_nameserver(self) -> tuple[dns.nameserver.Nameserver, bool, float]:
if self.retry_with_tcp:
assert self.nameserver is not None
assert not self.nameserver.is_always_max_size()
def query_result(
self, response: dns.message.Message | None, ex: Exception | None
- ) -> Tuple[Answer | None, bool]:
+ ) -> tuple[Answer | None, bool]:
#
# returns an (answer: Answer, end_loop: bool) tuple.
#
# pylint: disable=attribute-defined-outside-init
domain: dns.name.Name
- nameserver_ports: Dict[str, int]
+ nameserver_ports: dict[str, int]
port: int
- search: List[dns.name.Name]
+ search: list[dns.name.Name]
use_search_by_default: bool
timeout: float
lifetime: float
keyalgorithm: dns.name.Name | str
edns: int
ednsflags: int
- ednsoptions: List[dns.edns.Option] | None
+ ednsoptions: list[dns.edns.Option] | None
payload: int
cache: Any
flags: int | None
self,
start: float,
lifetime: float | None = None,
- errors: List[ErrorTuple] | None = None,
+ errors: list[ErrorTuple] | None = None,
) -> float:
lifetime = self.lifetime if lifetime is None else lifetime
now = time.time()
def _get_qnames_to_try(
self, qname: dns.name.Name, search: bool | None
- ) -> List[dns.name.Name]:
+ ) -> list[dns.name.Name]:
# This is a separate method so we can unit test the search
# rules without requiring the Internet.
if search is None:
edns: int | bool | None = 0,
ednsflags: int = 0,
payload: int = dns.message.DEFAULT_EDNS_PAYLOAD,
- options: List[dns.edns.Option] | None = None,
+ options: list[dns.edns.Option] | None = None,
) -> None:
"""Configure EDNS behavior.
def _enrich_nameservers(
cls,
nameservers: Sequence[str | dns.nameserver.Nameserver],
- nameserver_ports: Dict[str, int],
+ nameserver_ports: dict[str, int],
default_port: int,
- ) -> List[dns.nameserver.Nameserver]:
+ ) -> list[dns.nameserver.Nameserver]:
enriched_nameservers = []
if isinstance(nameservers, list | tuple):
for nameserver in nameservers:
# We make a modified kwargs for type checking happiness, as otherwise
# we get a legit warning about possibly having rdtype and rdclass
# in the kwargs more than once.
- modified_kwargs: Dict[str, Any] = {}
+ modified_kwargs: dict[str, Any] = {}
modified_kwargs.update(kwargs)
modified_kwargs["rdtype"] = dns.rdatatype.PTR
modified_kwargs["rdclass"] = dns.rdataclass.IN
# We make a modified kwargs for type checking happiness, as otherwise
# we get a legit warning about possibly having rdtype and rdclass
# in the kwargs more than once.
- modified_kwargs: Dict[str, Any] = {}
+ modified_kwargs: dict[str, Any] = {}
modified_kwargs.update(kwargs)
modified_kwargs.pop("rdtype", None)
modified_kwargs["rdclass"] = dns.rdataclass.IN
"""
if resolver is None:
resolver = get_default_resolver()
- nameservers: List[str | dns.nameserver.Nameserver] = []
+ nameservers: list[str | dns.nameserver.Nameserver] = []
if isinstance(where, str) and dns.inet.is_address(where):
nameservers.append(dns.nameserver.Do53Nameserver(where, port))
else:
# running process.
#
-_protocols_for_socktype: Dict[Any, List[Any]] = {
+_protocols_for_socktype: dict[Any, list[Any]] = {
socket.SOCK_DGRAM: [socket.SOL_UDP],
socket.SOCK_STREAM: [socket.SOL_TCP],
}
"""DNS RRsets (an RRset is a named rdataset)"""
-from typing import Any, Collection, Dict, cast
+from collections.abc import Collection
+from typing import Any, cast
import dns.name
import dns.rdata
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
-import dns.renderer
class RRset(dns.rdataset.Rdataset):
self,
origin: dns.name.Name | None = None,
relativize: bool = True,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> str:
"""Convert the RRset into DNS zone file format.
file: Any,
compress: dns.name.CompressType | None = None, # type: ignore
origin: dns.name.Name | None = None,
- **kw: Dict[str, Any],
+ **kw: dict[str, Any],
) -> int:
"""Convert the RRset to wire format.
import io
import sys
-from typing import Any, List, Tuple
+from typing import Any
import dns.exception
import dns.name
self.ungotten_char = None
return c
- def where(self) -> Tuple[str, int]:
+ def where(self) -> tuple[str, int]:
"""Return the current location in the input.
Returns a (string, int) tuple. The first item is the filename of
Returns an int.
"""
-
return self.as_uint8(self.get().unescape())
def get_uint16(self, base: int = 10) -> int:
return self.as_identifier(self.get().unescape())
- def get_remaining(self, max_tokens: int | None = None) -> List[Token]:
+ def get_remaining(self, max_tokens: int | None = None) -> list[Token]:
"""Return the remaining tokens on the line, until an EOL or EOF is seen.
max_tokens: If not None, stop after this number of tokens.
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import collections
-from typing import Any, Callable, Iterator, List, Tuple
+from collections.abc import Callable, Iterator
+from typing import Any
import dns.exception
import dns.name
def origin_information(
self,
- ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]:
+ ) -> tuple[dns.name.Name | None, bool, dns.name.Name | None]:
"""Returns a tuple
(absolute_origin, relativize, effective_origin)
self.replacement = replacement
self.read_only = read_only
self._ended = False
- self._check_put_rdataset: List[CheckPutRdatasetType] = []
- self._check_delete_rdataset: List[CheckDeleteRdatasetType] = []
- self._check_delete_name: List[CheckDeleteNameType] = []
+ self._check_put_rdataset: list[CheckPutRdatasetType] = []
+ self._check_delete_rdataset: list[CheckDeleteRdatasetType] = []
+ self._check_delete_name: list[CheckDeleteNameType] = []
#
# This is the high level API
#
# Note that we currently use non-immutable types in the return type signature to
- # avoid covariance problems, e.g. if the caller has a List[Rdataset], mypy will be
+ # avoid covariance problems, e.g. if the caller has a list[Rdataset], mypy will be
# unhappy if we return an ImmutableRdataset.
def get(
def iterate_rdatasets(
self,
- ) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
+ ) -> Iterator[tuple[dns.name.Name, dns.rdataset.Rdataset]]:
"""Iterate all the rdatasets in the transaction, returning
(`dns.name.Name`, `dns.rdataset.Rdataset`) tuples.
"""A place to store TSIG keys."""
import base64
-from typing import Any, Dict
+from typing import Any
import dns.name
import dns.tsig
-def from_text(textring: Dict[str, Any]) -> Dict[dns.name.Name, Any]:
+def from_text(textring: dict[str, Any]) -> dict[dns.name.Name, Any]:
"""Convert a dictionary containing (textual DNS name, base64 secret)
pairs into a binary keyring which has (dns.name.Name, bytes) pairs, or
a dictionary containing (textual DNS name, (algorithm, base64 secret))
pairs into a binary keyring which has (dns.name.Name, dns.tsig.Key) pairs.
@rtype: dict"""
- keyring: Dict[dns.name.Name, Any] = {}
+ keyring: dict[dns.name.Name, Any] = {}
for name, value in textring.items():
kname = dns.name.from_text(name)
if isinstance(value, str):
return keyring
-def to_text(keyring: Dict[dns.name.Name, Any]) -> Dict[str, Any]:
+def to_text(keyring: dict[dns.name.Name, Any]) -> dict[str, Any]:
"""Convert a dictionary containing (dns.name.Name, dns.tsig.Key) pairs
into a text keyring which has (textual DNS name, (textual algorithm,
base64 secret)) pairs, or a dictionary containing (dns.name.Name, bytes)
"""DNS Dynamic Update Support"""
-from typing import Any, List
+from typing import Any
import dns.enum
import dns.exception
self.use_tsig(keyring, keyname, algorithm=keyalgorithm)
@property
- def zone(self) -> List[dns.rrset.RRset]:
+ def zone(self) -> list[dns.rrset.RRset]:
"""The zone section."""
return self.sections[0]
self.sections[0] = v
@property
- def prerequisite(self) -> List[dns.rrset.RRset]:
+ def prerequisite(self) -> list[dns.rrset.RRset]:
"""The prerequisite section."""
return self.sections[1]
self.sections[1] = v
@property
- def update(self) -> List[dns.rrset.RRset]:
+ def update(self) -> list[dns.rrset.RRset]:
"""The update section."""
return self.sections[2]
import collections
import threading
-from typing import Callable, Deque, Set, cast
+from collections.abc import Callable
+from typing import cast
import dns.exception
import dns.name
the default policy, which retains one version is used.
"""
super().__init__(origin, rdclass, relativize)
- self._versions: Deque[Version] = collections.deque()
+ self._versions: collections.deque[Version] = collections.deque()
self._version_lock = threading.Lock()
if pruning_policy is None:
self._pruning_policy = self._default_pruning_policy
self._pruning_policy = pruning_policy
self._write_txn: Transaction | None = None
self._write_event: threading.Event | None = None
- self._write_waiters: Deque[threading.Event] = collections.deque()
- self._readers: Set[Transaction] = set()
+ self._write_waiters: collections.deque[threading.Event] = collections.deque()
+ self._readers: set[Transaction] = set()
self._commit_version_unlocked(
None, WritableVersion(self, replacement=True), origin
)
import contextlib
import struct
-from typing import Iterator, Optional, Tuple
+from collections.abc import Iterator
import dns.exception
import dns.name
def get_uint48(self) -> int:
return int.from_bytes(self.get_bytes(6), "big")
- def get_struct(self, format: str) -> Tuple:
+ def get_struct(self, format: str) -> tuple:
return struct.unpack(format, self.get_bytes(struct.calcsize(format)))
- def get_name(self, origin: Optional["dns.name.Name"] = None) -> "dns.name.Name":
+ def get_name(self, origin: "dns.name.Name | None" = None) -> "dns.name.Name":
name = dns.name.from_wire_parser(self)
if origin:
name = name.relativize(origin)
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-from typing import Any, List, Tuple, cast
+from typing import Any, cast
import dns.edns
import dns.exception
import dns.rdatatype
import dns.rdtypes
import dns.rdtypes.ANY
-import dns.rdtypes.ANY.SMIMEA
import dns.rdtypes.ANY.SOA
-import dns.rdtypes.svcbbase
import dns.serial
import dns.transaction
import dns.tsig
-import dns.zone
class TransferError(dns.exception.DNSException):
ednsflags: int | None = None,
payload: int | None = None,
request_payload: int | None = None,
- options: List[dns.edns.Option] | None = None,
+ options: list[dns.edns.Option] | None = None,
keyring: Any = None,
keyname: dns.name.Name | None = None,
keyalgorithm: dns.name.Name | str = dns.tsig.default_algorithm,
-) -> Tuple[dns.message.QueryMessage, int | None]:
+) -> tuple[dns.message.QueryMessage, int | None]:
"""Make an AXFR or IXFR query.
*txn_manager* is a ``dns.transaction.TransactionManager``, typically a
import io
import os
import struct
-from typing import (
- Any,
- Callable,
- Iterable,
- Iterator,
- List,
- MutableMapping,
- Set,
- Tuple,
- cast,
-)
+from collections.abc import Callable, Iterable, Iterator, MutableMapping
+from typing import Any, cast
import dns.exception
import dns.immutable
self,
rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
- ) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
+ ) -> Iterator[tuple[dns.name.Name, dns.rdataset.Rdataset]]:
"""Return a generator which yields (name, rdataset) tuples for
all rdatasets in the zone which have the specified *rdtype*
and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
self,
rdtype: dns.rdatatype.RdataType | str = dns.rdatatype.ANY,
covers: dns.rdatatype.RdataType | str = dns.rdatatype.NONE,
- ) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
+ ) -> Iterator[tuple[dns.name.Name, int, dns.rdata.Rdata]]:
"""Return a generator which yields (name, ttl, rdata) tuples for
all rdatas in the zone which have the specified *rdtype*
and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
def verify_digest(
self, zonemd: dns.rdtypes.ANY.ZONEMD.ZONEMD | None = None
) -> None:
- digests: dns.rdataset.Rdataset | List[dns.rdtypes.ANY.ZONEMD.ZONEMD]
+ digests: dns.rdataset.Rdataset | list[dns.rdtypes.ANY.ZONEMD.ZONEMD]
if zonemd:
digests = [zonemd]
else:
def origin_information(
self,
- ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]:
+ ) -> tuple[dns.name.Name | None, bool, dns.name.Name | None]:
effective: dns.name.Name | None
if self.relativize:
effective = dns.name.empty
# We have to copy the zone origin as it may be None in the first
# version, and we don't want to mutate the zone until we commit.
self.origin = zone.origin
- self.changed: Set[dns.name.Name] = set()
+ self.changed: set[dns.name.Name] = set()
def _maybe_cow_with_name(
self, name: dns.name.Name
- ) -> Tuple[dns.node.Node, dns.name.Name]:
+ ) -> tuple[dns.node.Node, dns.name.Name]:
name = self._validate_name(name)
node = self.nodes.get(name)
if node is None or name not in self.changed:
def _origin_information(
self,
- ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]:
+ ) -> tuple[dns.name.Name | None, bool, dns.name.Name | None]:
assert self.version is not None
(absolute, relativize, effective) = self.manager.origin_information()
if absolute is None and self.version.origin is not None:
import re
import sys
-from typing import Any, Iterable, List, Set, Tuple, cast
+from collections.abc import Iterable
+from typing import Any, cast
import dns.exception
import dns.grange
# adding the rdataset is ok
-SavedStateType = Tuple[
+SavedStateType = tuple[
dns.tokenizer.Tokenizer,
dns.name.Name | None, # current_origin
dns.name.Name | None, # last_name
self.last_name = self.current_origin
self.zone_rdclass = rdclass
self.txn = txn
- self.saved_state: List[SavedStateType] = []
+ self.saved_state: list[SavedStateType] = []
self.current_file: Any | None = None
- self.allowed_directives: Set[str]
+ self.allowed_directives: set[str]
if allow_directives is True:
self.allowed_directives = {"$GENERATE", "$ORIGIN", "$TTL"}
if allow_include:
self.txn.add(name, ttl, rd)
- def _parse_modify(self, side: str) -> Tuple[str, str, int, int, str]:
+ def _parse_modify(self, side: str) -> tuple[str, str, int, int, str]:
# Here we catch everything in '{' '}' in a group so we can replace it
# with ''.
is_generate1 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+),(.)}).*$")
self.origin = origin
self.relativize = relativize
self.rdclass = rdclass
- self.rrsets: List[dns.rrset.RRset] = []
+ self.rrsets: list[dns.rrset.RRset] = []
def reader(self): # pragma: no cover
raise NotImplementedError
effective = self.origin
return (self.origin, self.relativize, effective)
- def set_rrsets(self, rrsets: List[dns.rrset.RRset]) -> None:
+ def set_rrsets(self, rrsets: list[dns.rrset.RRset]) -> None:
self.rrsets = rrsets
idna_codec: dns.name.IDNACodec | None = None,
origin: dns.name.Name | str | None = dns.name.root,
relativize: bool = False,
-) -> List[dns.rrset.RRset]:
+) -> list[dns.rrset.RRset]:
"""Read one or more rrsets from the specified text, possibly subject
to restrictions.
import dns.ipv4
import dns.zone
-reverse_map = {} # type: Dict[str, List[str]]
+reverse_map = {} # type: dict[str, list[str]]
for filename in sys.argv[1:]:
zone = dns.zone.from_file(filename, os.path.basename(filename), relativize=False)
def __init__(self, listener, cid, peer, retry_cid=None):
self.original_cid: bytes = cid
self.listener = listener
- self.cids: Set[bytes] = set()
+ self.cids: set[bytes] = set()
self.cids.add(cid)
self.listener.connections[cid] = self
self.peer = peer
self.worker_scope = None
self.streams = {}
- def get_timer_values(self, now: float) -> Tuple[float, float]:
+ def get_timer_values(self, now: float) -> tuple[float, float]:
expiration = self.quic_connection.get_timer()
if expiration is None:
expiration = now + 3600 # arbitrary "big" value
def testToWire1(self):
n = dns.name.from_text("FOO.bar")
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
n.to_wire(f, compress)
self.assertEqual(f.getvalue(), b"\x03FOO\x03bar\x00")
def testToWire2(self):
n = dns.name.from_text("FOO.bar")
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
n.to_wire(f, compress)
n.to_wire(f, compress)
self.assertEqual(f.getvalue(), b"\x03FOO\x03bar\x00\xc0\x00")
n1 = dns.name.from_text("FOO.bar")
n2 = dns.name.from_text("foo.bar")
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
n1.to_wire(f, compress)
n2.to_wire(f, compress)
self.assertEqual(f.getvalue(), b"\x03FOO\x03bar\x00\xc0\x00")
n1 = dns.name.from_text("FOO.bar")
n2 = dns.name.from_text("a.foo.bar")
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
n1.to_wire(f, compress)
n2.to_wire(f, compress)
self.assertEqual(f.getvalue(), b"\x03FOO\x03bar\x00\x01\x61\xc0\x00")
n1 = dns.name.from_text("FOO.bar")
n2 = dns.name.from_text("a.foo.bar")
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
n1.to_wire(f, compress)
n2.to_wire(f, None)
self.assertEqual(f.getvalue(), b"\x03FOO\x03bar\x00\x01\x61\x03foo\x03bar\x00")
def bad():
n = dns.name.from_text("FOO.bar", None)
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
n.to_wire(f, compress)
self.assertRaises(dns.name.NeedAbsoluteNameOrOrigin, bad)
def testGiantCompressionTable(self):
# Only the first 16KiB of a message can have compression pointers.
f = BytesIO()
- compress = {} # type: Dict[dns.name.Name,int]
+ compress = {} # type: dict[dns.name.Name,int]
# exactly 16 bytes encoded
n = dns.name.from_text("0000000000.com.")
n.to_wire(f, compress)