async def receive_udp(sock: dns.asyncbackend.DatagramSocket,
destination: Optional[Any]=None, expiration: Optional[float]=None,
- ignore_unexpected=False, one_rr_per_rrset=False,
+ ignore_unexpected: bool=False, one_rr_per_rrset: bool=False,
keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None, request_mac=b'',
- ignore_trailing=False, raise_on_truncation=False) -> Any:
+ ignore_trailing: bool=False, raise_on_truncation: bool=False) -> Any:
"""Read a DNS message from a UDP socket.
*sock*, a ``dns.asyncbackend.DatagramSocket``.
raise_on_truncation=raise_on_truncation)
return (r, received_time, from_address)
-async def udp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port=53,
- source: Optional[str]=None, source_port=0,
- ignore_unexpected=False, one_rr_per_rrset=False, ignore_trailing=False,
- raise_on_truncation=False, sock: Optional[dns.asyncbackend.DatagramSocket]=None,
+async def udp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port: int=53,
+ source: Optional[str]=None, source_port: int=0,
+ ignore_unexpected: bool=False, one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
+ raise_on_truncation: bool=False, sock: Optional[dns.asyncbackend.DatagramSocket]=None,
backend: Optional[dns.asyncbackend.Backend]=None) -> dns.message.Message:
"""Return the response obtained after sending a query via UDP.
if not sock and s:
await s.close()
-async def udp_with_fallback(q: dns.message.Message, where: str, timeout: Optional[float]=None, port=53,
- source: Optional[str]=None, source_port=0,
- ignore_unexpected=False, one_rr_per_rrset=False, ignore_trailing=False,
+async def udp_with_fallback(q: dns.message.Message, where: str, timeout: Optional[float]=None, port: int=53,
+ source: Optional[str]=None, source_port: int=0,
+ ignore_unexpected: bool=False, one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
udp_sock: Optional[dns.asyncbackend.DatagramSocket]=None,
tcp_sock: Optional[dns.asyncbackend.StreamSocket]=None,
backend: Optional[dns.asyncbackend.Backend]=None) -> Tuple[dns.message.Message, bool]:
async def receive_tcp(sock: dns.asyncbackend.StreamSocket,
- expiration: Optional[float]=None, one_rr_per_rrset=False,
+ expiration: Optional[float]=None, one_rr_per_rrset: bool=False,
keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None,
- request_mac=b'', ignore_trailing=False) -> Tuple[dns.message.Message, float]:
+ request_mac=b'', ignore_trailing: bool=False) -> Tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
*sock*, a ``dns.asyncbackend.StreamSocket``.
return (r, received_time)
-async def tcp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port=53,
- source: Optional[str]=None, source_port=0,
- one_rr_per_rrset=False, ignore_trailing=False,
+async def tcp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port: int=53,
+ source: Optional[str]=None, source_port: int=0,
+ one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
sock: Optional[dns.asyncbackend.StreamSocket]=None,
backend: Optional[dns.asyncbackend.Backend]=None) -> dns.message.Message:
"""Return the response obtained after sending a query via TCP.
await s.close()
async def tls(q: dns.message.Message, where: str, timeout: Optional[float]=None,
- port=853, source: Optional[str]=None, source_port=0,
- one_rr_per_rrset=False, ignore_trailing=False,
+ port: int=853, source: Optional[str]=None, source_port: int=0,
+ one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
sock: Optional[dns.asyncbackend.StreamSocket]=None,
backend: Optional[dns.asyncbackend.Backend]=None,
ssl_context: Optional[ssl.SSLContext]=None,
await s.close()
async def https(q: dns.message.Message, where: str, timeout: Optional[float]=None,
- port=443, source: Optional[str]=None, source_port=0,
- one_rr_per_rrset=False, ignore_trailing=False,
+ port: int=443, source: Optional[str]=None, source_port: int=0,
+ one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
client: Optional[httpx.AsyncClient]=None,
- path='/dns-query', post=True, verify=True):
+ path: str='/dns-query', post: bool=True, verify: bool=True) -> dns.message.Message:
"""Return the response obtained after sending a query via DNS-over-HTTPS.
*client*, a ``httpx.AsyncClient``. If provided, the client to use for
async def inbound_xfr(where: str, txn_manager: dns.transaction.TransactionManager,
query: Optional[dns.message.Message]=None,
- port=53, timeout: Optional[float]=None, lifetime: Optional[float]=None,
- source: Optional[str]=None, source_port=0, udp_mode=UDPMode.NEVER,
- backend: Optional[dns.asyncbackend.Backend]=None):
+ port: int=53, timeout: Optional[float]=None, lifetime: Optional[float]=None,
+ source: Optional[str]=None, source_port: int=0, udp_mode=UDPMode.NEVER,
+ backend: Optional[dns.asyncbackend.Backend]=None) -> None:
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
import dns.exception
import dns.name
import dns.query
+import dns.rdataclass
+import dns.rdatatype
import dns.resolver # lgtm[py/import-and-import-from]
# import some resolver symbols for brevity
"""Asynchronous DNS stub resolver."""
async def resolve(self, qname: Union[dns.name.Name, str],
- rdtype=dns.rdatatype.A,
- rdclass=dns.rdataclass.IN,
- tcp=False, source: Optional[str]=None,
- raise_on_no_answer=True, source_port=0,
+ rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.A,
+ rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
+ tcp: bool=False, source: Optional[str]=None,
+ raise_on_no_answer: bool=True, source_port: int=0,
lifetime: Optional[float]=None, search: Optional[bool]=None,
backend: Optional[dns.asyncbackend.Backend]=None) -> dns.resolver.Answer:
"""Query nameservers asynchronously to find the answer to the question.
return default_resolver
-def reset_default_resolver():
+def reset_default_resolver() -> None:
"""Re-initialize default asynchronous resolver.
Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX
async def resolve(qname: Union[dns.name.Name, str],
- rdtype=dns.rdatatype.A,
- rdclass=dns.rdataclass.IN,
- tcp=False, source: Optional[str]=None,
- raise_on_no_answer=True, source_port=0,
+ rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.A,
+ rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
+ tcp: bool=False, source: Optional[str]=None,
+ raise_on_no_answer: bool=True, source_port: int=0,
lifetime: Optional[float]=None, search: Optional[bool]=None,
backend: Optional[dns.asyncbackend.Backend]=None) -> dns.resolver.Answer:
"""Query nameservers asynchronously to find the answer to the question.
return await get_default_resolver().canonical_name(name)
-async def zone_for_name(name: Union[dns.name.Name, str], rdclass=dns.rdataclass.IN,
- tcp=False, resolver: Optional[Resolver]=None,
+async def zone_for_name(name: Union[dns.name.Name, str],
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN,
+ tcp: bool=False, resolver: Optional[Resolver]=None,
backend: Optional[dns.asyncbackend.Backend]=None) -> dns.name.Name:
"""Find the name of the zone which contains the specified name.
return int.from_bytes(b, 'big')
-def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any):
+def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any) -> None:
keyptr: bytes
if _is_rsa(key.algorithm):
# we ignore because mypy is confused and thinks key.key is a str for unknown reasons.
def _validate_rrsig(rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
rrsig: RRSIG,
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
- origin: Optional[dns.name.Name]=None, now: Optional[float]=None):
+ origin: Optional[dns.name.Name]=None, now: Optional[float]=None) -> None:
"""Validate an RRset against a single signature rdata, throwing an
exception if validation is not successful.
def _validate(rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
rrsigset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
- origin: Optional[dns.name.Name]=None, now: Optional[float]=None):
+ origin: Optional[dns.name.Name]=None, now: Optional[float]=None) -> None:
"""Validate an RRset against a signature RRset, throwing an exception
if none of the signatures validate.
raise ValidationFailure("no RRSIGs validated")
-def nsec3_hash(domain, salt, iterations, algorithm):
+def nsec3_hash(domain: Union[dns.name.Name, str], salt: Optional[Union[str, bytes]],
+ iterations: int, algorithm: Union[int, str]) -> str:
"""
Calculate the NSEC3 hash, according to
https://tools.ietf.org/html/rfc5155#section-5
if algorithm != NSEC3Hash.SHA1:
raise ValueError("Wrong hash algorithm (only SHA1 is supported)")
- salt_encoded = salt
if salt is None:
salt_encoded = b''
elif isinstance(salt, str):
salt_encoded = bytes.fromhex(salt)
else:
raise ValueError("Invalid salt length")
+ else:
+ salt_encoded = salt
if not isinstance(domain, dns.name.Name):
domain = dns.name.from_text(domain)
domain_encoded = domain.canonicalize().to_wire()
+ assert domain_encoded is not None
digest = hashlib.sha1(domain_encoded + salt_encoded).digest()
for _ in range(iterations):
def to_e164(name: dns.name.Name, origin: Optional[dns.name.Name]=public_enum_domain,
- want_plus_prefix=True) -> str:
+ want_plus_prefix: bool=True) -> str:
"""Convert an ENUM domain name into an E.164 number.
Note that dnspython does not have any information about preferred
self.scopelen)
@staticmethod
- def from_text(text) -> Option:
+ def from_text(text: str) -> Option:
"""Convert a string into a `dns.edns.ECSOption`
*text*, a `str`, the text form of the option.
raise ValueError('could not parse ECS from "{}"'.format(text))
n_slashes = ecs_text.count('/')
if n_slashes == 1:
- address, srclen = ecs_text.split('/')
- scope = 0
+ address, tsrclen = ecs_text.split('/')
+ tscope = '0'
elif n_slashes == 2:
- address, srclen, scope = ecs_text.split('/')
+ address, tsrclen, tscope = ecs_text.split('/')
else:
raise ValueError('could not parse ECS from "{}"'.format(text))
try:
- scope = int(scope)
+ scope = int(tscope)
except ValueError:
raise ValueError('invalid scope ' +
- '"{}": scope must be an integer'.format(scope))
+ '"{}": scope must be an integer'.format(tscope))
try:
- srclen = int(srclen)
+ srclen = int(tsrclen)
except ValueError:
raise ValueError('invalid srclen ' +
- '"{}": srclen must be an integer'.format(srclen))
+ '"{}": srclen must be an integer'.format(tsrclen))
return ECSOption(address, srclen, scope)
- def to_wire(self, file=None) -> Optional[bytes]:
+ def to_wire(self, file: Optional[Any]=None) -> Optional[bytes]:
value = (struct.pack('!HBB', self.family, self.srclen, self.scopelen) +
self.addrdata)
if file:
with parser.restrict_to(olen):
return option_from_wire_parser(otype, parser)
-def register_type(implementation: Any, otype: OptionType):
+def register_type(implementation: Any, otype: OptionType) -> None:
"""Register the implementation of an option type.
*implementation*, a ``class``, is a subclass of ``dns.edns.Option``.
# leaving this code doesn't hurt anything as the library code
# is used if present.
- def __init__(self, seed=None):
+ def __init__(self, seed: Optional[bytes]=None):
self.pool_index = 0
self.digest: Optional[bytearray] = None
self.next_byte = 0
self.hash_len = 20
self.pool = bytearray(b'\0' * self.hash_len)
if seed is not None:
- self._stir(bytearray(seed))
+ self._stir(seed)
self.seeded = True
self.seed_pid = os.getpid()
else:
self.seeded = False
self.seed_pid = 0
- def _stir(self, entropy):
+ def _stir(self, entropy: bytes) -> None:
for c in entropy:
if self.pool_index == self.hash_len:
self.pool_index = 0
self.pool[self.pool_index] ^= b
self.pool_index += 1
- def stir(self, entropy):
+ def stir(self, entropy: bytes) -> None:
with self.lock:
self._stir(entropy)
- def _maybe_seed(self):
+ def _maybe_seed(self) -> None:
if not self.seeded or self.seed_pid != os.getpid():
try:
seed = os.urandom(16)
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+from typing import Any
+
import collections.abc
from dns._immutable_ctx import immutable
@immutable
class Dict(collections.abc.Mapping): # lgtm[py/missing-equals]
- def __init__(self, dictionary, no_copy=False):
+ def __init__(self, dictionary: Any, no_copy: bool=False):
"""Make an immutable dictionary from the specified dictionary.
If *no_copy* is `True`, then *dictionary* will be wrapped instead
return iter(self._odict)
-def constify(o):
+def constify(o: Any) -> Any:
"""
Convert mutable types to immutable types.
"""
_colon_colon_start = re.compile(br'::.*')
_colon_colon_end = re.compile(br'.*::$')
-def inet_aton(text: Union[str, bytes], ignore_scope=False) -> bytes:
+def inet_aton(text: Union[str, bytes], ignore_scope: bool=False) -> bytes:
"""Convert an IPv6 address in text form to binary form.
*text*, a ``str``, the IPv6 address in textual form.
_mapped_prefix = b'\x00' * 10 + b'\xff\xff'
-def is_mapped(address):
+def is_mapped(address: bytes) -> bool:
"""Is the specified address a mapped IPv4 address?
*address*, a ``bytes`` is an IPv6 address in binary form.
def __str__(self):
return self.to_text()
- def to_text(self, origin: Optional[dns.name.Name]=None, relativize=True,
- **kw):
+ def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True,
+ **kw) -> str:
"""Convert the message to text.
The *origin*, *relativize*, and any other keyword
rdtype: dns.rdatatype.RdataType,
covers = dns.rdatatype.NONE,
deleting: Optional[dns.rdataclass.RdataClass]=None,
- create=False,
- force_unique=False) -> dns.rrset.RRset:
+ create: bool=False,
+ force_unique: bool=False) -> dns.rrset.RRset:
"""Find the RRset with the given attributes in the specified section.
*section*, an ``int`` section number, or one of the section
name: dns.name.Name,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- covers = dns.rdatatype.NONE,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
deleting: Optional[dns.rdataclass.RdataClass]=None,
- create=False,
- force_unique=False) -> Optional[dns.rrset.RRset]:
+ create: bool=False,
+ force_unique: bool=False) -> Optional[dns.rrset.RRset]:
"""Get the RRset with the given attributes in the specified section.
If the RRset is not found, None is returned.
rrset = None
return rrset
- def to_wire(self, origin: Optional[dns.name.Name]=None, max_size=0,
- multi=False, tsig_ctx: Optional[Any]=None, **kw) -> bytes:
+ def to_wire(self, origin: Optional[dns.name.Name]=None, max_size: int=0,
+ multi: bool=False, tsig_ctx: Optional[Any]=None, **kw) -> bytes:
"""Return a string containing the message in DNS compressed wire
format.
original_id, error, other)
return dns.rrset.from_rdata(keyname, 0, tsig)
- def use_tsig(self, keyring: Any, keyname: Optional[dns.name.Name]=None,
- fudge=300, original_id: Optional[int]=None, tsig_error=0,
- other_data=b'', algorithm=dns.tsig.default_algorithm):
+ def use_tsig(self, keyring: Any, keyname: Optional[Union[dns.name.Name, str]]=None,
+ fudge: int=300, original_id: Optional[int]=None, tsig_error: int=0,
+ other_data: bytes=b'',
+ algorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm) -> None:
"""When sending, a TSIG signature using the specified key
should be added.
*other_data*, a ``bytes``, the TSIG other data.
- *algorithm*, a ``dns.name.Name``, the TSIG algorithm to use. This is
+ *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is
only used if *keyring* is a ``dict``, and the key entry is a ``bytes``.
"""
options or ())
return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
- def use_edns(self, edns=0, ednsflags=0, payload=DEFAULT_EDNS_PAYLOAD,
+ def use_edns(self, edns: Optional[Union[int, bool]]=0, ednsflags: int=0, payload: int=DEFAULT_EDNS_PAYLOAD,
request_payload: Optional[int]=None,
- options: Optional[List[dns.edns.Option]]=None):
+ options: Optional[List[dns.edns.Option]]=None) -> None:
"""Configure EDNS behavior.
*edns*, an ``int``, is the EDNS level to use. Specifying
else:
return ()
- def want_dnssec(self, wanted=True):
+ def want_dnssec(self, wanted: bool=True) -> None:
"""Enable or disable 'DNSSEC desired' flag in requests.
*wanted*, a ``bool``. If ``True``, then DNSSEC data is
"""
return dns.rcode.from_flags(int(self.flags), int(self.ednsflags))
- def set_rcode(self, rcode: dns.rcode.Rcode):
+ def set_rcode(self, rcode: dns.rcode.Rcode) -> None:
"""Set the rcode.
*rcode*, a ``dns.rcode.Rcode``, is the rcode to set.
"""
return dns.opcode.from_flags(int(self.flags))
- def set_opcode(self, opcode: dns.opcode.Opcode):
+ def set_opcode(self, opcode: dns.opcode.Opcode) -> None:
"""Set the opcode.
*opcode*, a ``dns.opcode.Opcode``, is the opcode to set.
return self.message
-def from_wire(wire, keyring=None, request_mac=b'', xfr=False, origin=None,
- tsig_ctx=None, multi=False,
- question_only=False, one_rr_per_rrset=False,
- ignore_trailing=False, raise_on_truncation=False,
- continue_on_error=False) -> Message:
+def from_wire(wire, keyring: Optional[Any]=None, request_mac: Optional[bytes]=b'',
+ xfr: bool=False, origin: Optional[dns.name.Name]=None,
+ tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]]=None,
+ multi: bool=False, question_only: bool=False, one_rr_per_rrset: bool=False,
+ ignore_trailing: bool=False, raise_on_truncation: bool=False,
+ continue_on_error: bool=False) -> Message:
"""Convert a DNS wire format message into a message object.
*keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the
message is signed.
- *request_mac*, a ``bytes``. If the message is a response to a TSIG-signed
+ *request_mac*, a ``bytes`` or ``None``. If the message is a response to a TSIG-signed
request, *request_mac* should be set to the MAC of that request.
*xfr*, a ``bool``, should be set to ``True`` if this message is part of a
Returns a ``dns.message.Message``.
"""
+ # We permit None for request_mac solely for backwards compatibility
+ if request_mac is None:
+ request_mac = b''
+
def initialize_message(message):
message.request_mac = request_mac
message.xfr = xfr
return self.message
-def from_text(text, idna_codec=None, one_rr_per_rrset=False,
- origin=None, relativize=True, relativize_to=None) -> Message:
+def from_text(text, idna_codec: Optional[dns.name.IDNACodec]=None,
+ one_rr_per_rrset: bool=False, origin: Optional[dns.name.Name]=None,
+ relativize: bool=True, relativize_to: Optional[dns.name.Name]=None) -> Message:
"""Convert the text format message into a message object.
The reader stops after reading the first blank line in the input to
return reader.read()
-def from_file(f, idna_codec=None, one_rr_per_rrset=False) -> Message:
+def from_file(f, idna_codec: Optional[dns.name.IDNACodec]=None, one_rr_per_rrset: bool=False) -> Message:
"""Read the next text format message from the specified file.
Message blocks are separated by a single blank line.
assert False # for mypy lgtm[py/unreachable-statement]
-def make_query(qname, rdtype, rdclass=dns.rdataclass.IN, use_edns=None,
- want_dnssec=False, ednsflags: Optional[int]=None, payload: Optional[int]=None,
+def make_query(qname: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
+ use_edns: Optional[Union[int, bool]]=None,
+ want_dnssec: bool=False, ednsflags: Optional[int]=None, payload: Optional[int]=None,
request_payload: Optional[int]=None, options: Optional[List[dns.edns.Option]]=None,
idna_codec: Optional[dns.name.IDNACodec]=None, id: Optional[int]=None,
flags: int=dns.flags.RD) -> QueryMessage:
if isinstance(qname, str):
qname = dns.name.from_text(qname, idna_codec=idna_codec)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- rdclass = dns.rdataclass.RdataClass.make(rdclass)
+ the_rdtype = dns.rdatatype.RdataType.make(rdtype)
+ the_rdclass = dns.rdataclass.RdataClass.make(rdclass)
m = QueryMessage(id=id)
m.flags = dns.flags.Flag(flags)
- m.find_rrset(m.question, qname, rdclass, rdtype, create=True,
+ m.find_rrset(m.question, qname, the_rdclass, the_rdtype, create=True,
force_unique=True)
# only pass keywords on to use_edns if they have been set to a
# non-None value. Setting a field will turn EDNS on if it hasn't
"""DNS Names.
"""
-from typing import Dict, Iterable, Optional, Tuple, Union
+from typing import Any, Dict, Iterable, Optional, Tuple, Union
import copy
import struct
IDNA_2008_Transitional = IDNA2008Codec(True, True, False, False)
IDNA_2008 = IDNA_2008_Practical
-def _validate_labels(labels: Tuple[bytes, ...]):
+def _validate_labels(labels: Tuple[bytes, ...]) -> None:
"""Check for empty labels in the middle of a label sequence,
labels that are too long, and for too many labels.
def __str__(self):
return self.to_text(False)
- def to_text(self, omit_final_dot=False) -> str:
+ def to_text(self, omit_final_dot: bool=False) -> str:
"""Convert name to DNS text format.
*omit_final_dot* is a ``bool``. If True, don't emit the final
s = '.'.join(map(_escapify, l))
return s
- def to_unicode(self, omit_final_dot=False, idna_codec: Optional[IDNACodec]=None) -> str:
+ def to_unicode(self, omit_final_dot: bool=False, idna_codec: Optional[IDNACodec]=None) -> str:
"""Convert name to Unicode text format.
IDN ACE labels are converted to Unicode.
assert digest is not None
return digest
- def to_wire(self, file=None, compress: Optional[CompressType]=None,
- origin: Optional['Name']=None, canonicalize=False) -> Optional[bytes]:
+ def to_wire(self, file: Optional[Any]=None, compress: Optional[CompressType]=None,
+ origin: Optional['Name']=None, canonicalize: bool=False) -> Optional[bytes]:
"""Convert name to wire format, possibly compressing it.
*file* is the file where the name is emitted (typically an
else:
return self
- def choose_relativity(self, origin: Optional['Name']=None, relativize=True) -> 'Name':
+ def choose_relativity(self, origin: Optional['Name']=None, relativize: bool=True) -> 'Name':
"""Return a name with the relativity desired by the caller.
If *origin* is ``None``, then the name is returned.
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
- create=False) -> dns.rdataset.Rdataset:
+ create: bool=False) -> dns.rdataset.Rdataset:
"""Find an rdataset matching the specified properties in the
current node.
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
- create=False) -> Optional[dns.rdataset.Rdataset]:
+ create: bool=False) -> Optional[dns.rdataset.Rdataset]:
"""Get an rdataset matching the specified properties in the
current node.
def delete_rdataset(self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType=dns.rdatatype.NONE):
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE) -> None:
"""Delete the rdataset matching the specified properties in the
current node.
if rds is not None:
self.rdatasets.remove(rds)
- def replace_rdataset(self, replacement: dns.rdataset.Rdataset):
+ def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
"""Replace an rdataset.
It is not an error if there is no rdataset matching *replacement*.
[dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
)
- def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
- create=False):
+ def find_rdataset(self,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
+ create: bool=False) -> dns.rdataset.Rdataset:
if create:
raise TypeError("immutable")
return super().find_rdataset(rdclass, rdtype, covers, False)
- def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
- create=False):
+ def get_rdataset(self,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
+ create: bool=False) -> Optional[dns.rdataset.Rdataset]:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
- def delete_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
+ def delete_rdataset(self,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE) -> None:
raise TypeError("immutable")
- def replace_rdataset(self, replacement):
+ def replace_rdataset(self, replacement) -> None:
raise TypeError("immutable")
def is_immutable(self) -> bool:
raise
def https(q: dns.message.Message, where: str, timeout: Optional[float]=None,
- port=443, source: Optional[str]=None, source_port=0,
- one_rr_per_rrset=False, ignore_trailing=False,
- session: Optional[Any]=None, path='/dns-query', post=True,
- bootstrap_address: Optional[str]=None, verify=True) -> dns.message.Message:
+ port: int=443, source: Optional[str]=None, source_port: int=0,
+ one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
+ session: Optional[Any]=None, path: str='/dns-query', post: bool=True,
+ bootstrap_address: Optional[str]=None, verify: bool=True) -> dns.message.Message:
"""Return the response obtained after sending a query via DNS-over-HTTPS.
*q*, a ``dns.message.Message``, the query to send.
def receive_udp(sock: Any, destination: Optional[Any]=None, expiration: Optional[float]=None,
- ignore_unexpected=False, one_rr_per_rrset=False,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None, request_mac=b'',
- ignore_trailing=False, raise_on_truncation=False) -> Any:
+ ignore_unexpected: bool=False, one_rr_per_rrset: bool=False,
+ keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None, request_mac: Optional[bytes]=b'',
+ ignore_trailing: bool=False, raise_on_truncation: bool=False) -> Any:
"""Read a DNS message from a UDP socket.
*sock*, a ``socket``.
*keyring*, a ``dict``, the keyring to use for TSIG.
- *request_mac*, a ``bytes``, the MAC of the request (for TSIG).
+ *request_mac*, a ``bytes`` or ``None``, the MAC of the request (for TSIG).
*ignore_trailing*, a ``bool``. If ``True``, ignore trailing
junk at end of the received message.
else:
return (r, received_time, from_address)
-def udp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port=53,
- source: Optional[str]=None, source_port=0,
- ignore_unexpected=False, one_rr_per_rrset=False, ignore_trailing=False,
- raise_on_truncation=False, sock: Optional[Any]=None) -> dns.message.Message:
+def udp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port: int=53,
+ source: Optional[str]=None, source_port: int=0,
+ ignore_unexpected: bool=False, one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
+ raise_on_truncation: bool=False, sock: Optional[Any]=None) -> dns.message.Message:
"""Return the response obtained after sending a query via UDP.
*q*, a ``dns.message.Message``, the query to send
return r
assert False # help mypy figure out we can't get here lgtm[py/unreachable-statement]
-def udp_with_fallback(q: dns.message.Message, where: str, timeout: Optional[float]=None, port=53,
- source: Optional[str]=None, source_port=0,
- ignore_unexpected=False, one_rr_per_rrset=False, ignore_trailing=False,
+def udp_with_fallback(q: dns.message.Message, where: str, timeout: Optional[float]=None, port: int=53,
+ source: Optional[str]=None, source_port: int=0,
+ ignore_unexpected: bool=False, one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
udp_sock: Optional[Any]=None,
tcp_sock: Optional[Any]=None) -> Tuple[dns.message.Message, bool]:
"""Return the response to the query, trying UDP first and falling back
_net_write(sock, tcpmsg, expiration)
return (len(tcpmsg), sent_time)
-def receive_tcp(sock: Any, expiration: Optional[float]=None, one_rr_per_rrset=False,
- keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None, request_mac=b'',
- ignore_trailing=False) -> Tuple[dns.message.Message, float]:
+def receive_tcp(sock: Any, expiration: Optional[float]=None, one_rr_per_rrset: bool=False,
+ keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None,
+ request_mac: Optional[bytes]=b'',
+ ignore_trailing: bool=False) -> Tuple[dns.message.Message, float]:
"""Read a DNS message from a TCP socket.
*sock*, a ``socket``.
*keyring*, a ``dict``, the keyring to use for TSIG.
- *request_mac*, a ``bytes``, the MAC of the request (for TSIG).
+ *request_mac*, a ``bytes`` or ``None``, the MAC of the request (for TSIG).
*ignore_trailing*, a ``bool``. If ``True``, ignore trailing
junk at end of the received message.
raise OSError(err, os.strerror(err))
-def tcp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port=53,
- source: Optional[str]=None, source_port=0,
- one_rr_per_rrset=False, ignore_trailing=False, sock: Optional[Any]=None) -> dns.message.Message:
+def tcp(q: dns.message.Message, where: str, timeout: Optional[float]=None, port: int=53,
+ source: Optional[str]=None, source_port: int=0,
+ one_rr_per_rrset: bool=False, ignore_trailing: bool=False,
+ sock: Optional[Any]=None) -> dns.message.Message:
"""Return the response obtained after sending a query via TCP.
*q*, a ``dns.message.Message``, the query to send
def tls(q: dns.message.Message, where: str, timeout: Optional[float]=None,
- port=853, source: Optional[str]=None, source_port=0,
- one_rr_per_rrset=False, ignore_trailing=False, sock: Optional[ssl.SSLSocket]=None,
+ port: int=853, source: Optional[str]=None, source_port: int=0,
+ one_rr_per_rrset: bool=False, ignore_trailing: bool=False, sock: Optional[ssl.SSLSocket]=None,
ssl_context: Optional[ssl.SSLContext]=None,
server_hostname: Optional[str]=None) -> dns.message.Message:
"""Return the response obtained after sending a query via TLS.
return r
assert False # help mypy figure out we can't get here lgtm[py/unreachable-statement]
-def xfr(where, zone, rdtype=dns.rdatatype.AXFR, rdclass=dns.rdataclass.IN,
- timeout=None, port=53, keyring=None, keyname=None, relativize=True,
- lifetime=None, source=None, source_port=0, serial=0,
- use_udp=False, keyalgorithm=dns.tsig.default_algorithm):
+def xfr(where: str, zone: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.AXFR,
+ rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
+ timeout: Optional[float]=None, port: int=53,
+ keyring: Optional[Dict[dns.name.Name, dns.tsig.Key]]=None,
+ keyname: Optional[Union[dns.name.Name, str]]=None, relativize: bool=True,
+ lifetime: Optional[float]=None, source: Optional[str]=None, source_port: int=0,
+ serial: int=0, use_udp: bool=False,
+ keyalgorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm) -> Any:
"""Return a generator for the responses to a zone transfer.
*where*, a ``str`` containing an IPv4 or IPv6 address, where
def inbound_xfr(where: str, txn_manager: dns.transaction.TransactionManager,
query: Optional[dns.message.Message]=None,
- port=53, timeout: Optional[float]=None, lifetime: Optional[float]=None,
- source: Optional[str]=None, source_port=0, udp_mode=UDPMode.NEVER):
+ port: int=53, timeout: Optional[float]=None, lifetime: Optional[float]=None,
+ source: Optional[str]=None, source_port: int=0, udp_mode=UDPMode.NEVER):
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
"""DNS Result Codes."""
+from typing import Tuple
+
import dns.enum
import dns.exception
"""A DNS rcode is unknown."""
-def from_text(text):
+def from_text(text: str) -> Rcode:
"""Convert text into an rcode.
*text*, a ``str``, the textual rcode or an integer in textual form.
Raises ``dns.rcode.UnknownRcode`` if the rcode mnemonic is unknown.
- Returns an ``int``.
+ Returns a ``dns.rcode.Rcode``.
"""
return Rcode.from_text(text)
-def from_flags(flags, ednsflags):
+def from_flags(flags: int, ednsflags: int) -> Rcode:
"""Return the rcode value encoded by flags and ednsflags.
*flags*, an ``int``, the DNS flags field.
Raises ``ValueError`` if rcode is < 0 or > 4095
- Returns an ``int``.
+ Returns a ``dns.rcode.Rcode``.
"""
value = (flags & 0x000f) | ((ednsflags >> 20) & 0xff0)
- return value
+ return Rcode.make(value)
-def to_flags(value):
+def to_flags(value: Rcode) -> Tuple[int, int]:
"""Return a (flags, ednsflags) tuple which encodes the rcode.
- *value*, an ``int``, the rcode.
+ *value*, a ``dns.rcode.Rcode``, the rcode.
Raises ``ValueError`` if rcode is < 0 or > 4095.
return (v, ev)
-def to_text(value, tsig=False):
+def to_text(value: Rcode, tsig: bool=False) -> str:
"""Convert rcode into text.
- *value*, an ``int``, the rcode.
+ *value*, a ``dns.rcode.Rcode``, the rcode.
Raises ``ValueError`` if rcode is < 0 or > 4095.
return self.covers() << 16 | self.rdtype
- def to_text(self, origin: Optional[dns.name.Name]=None, relativize=True, **kw):
+ def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw):
"""Convert an rdata to text format.
Returns a ``str``.
raise NotImplementedError # pragma: no cover
- def _to_wire(self, file, compress: Optional[dns.name.CompressType]=None,
- origin: Optional[dns.name.Name]=None, canonicalize=False):
+ def _to_wire(self, file: Optional[Any], compress: Optional[dns.name.CompressType]=None,
+ origin: Optional[dns.name.Name]=None, canonicalize: bool=False) -> bytes:
raise NotImplementedError # pragma: no cover
- def to_wire(self, file=None, compress=None, origin=None,
- canonicalize=False) -> bytes:
+ def to_wire(self, file: Optional[Any]=None, compress: Optional[dns.name.CompressType]=None,
+ origin: Optional[dns.name.Name]=None, canonicalize: bool=False) -> bytes:
"""Convert an rdata to wire format.
Returns a ``bytes`` or ``None``.
@classmethod
def from_text(cls, rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None, relativize=True,
+ tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None, relativize: bool=True,
relativize_to: Optional[dns.name.Name]=None):
raise NotImplementedError # pragma: no cover
@classmethod
def from_wire_parser(cls, rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- parser: dns.wire.Parser, origin: Optional[dns.name.Name]=None):
+ parser: dns.wire.Parser, origin: Optional[dns.name.Name]=None) -> 'Rdata':
raise NotImplementedError # pragma: no cover
- def replace(self, **kwargs):
+ def replace(self, **kwargs) -> 'Rdata':
"""
Create a new Rdata instance based on the instance replace was
invoked on. It is possible to pass different parameters to
"""
# Get the constructor parameters.
- parameters = inspect.signature(self.__init__).parameters
+ parameters = inspect.signature(self.__init__).parameters # type: ignore
# Ensure that all of the arguments correspond to valid fields.
# Don't allow rdclass or rdtype to be changed, though.
rdtype: Union[dns.rdatatype.RdataType, str],
tok: Union[dns.tokenizer.Tokenizer, str],
origin: Optional[dns.name.Name]=None,
- relativize=True, relativize_to: Optional[dns.name.Name]=None,
+ relativize: bool=True, relativize_to: Optional[dns.name.Name]=None,
idna_codec: Optional[dns.name.IDNACodec]=None) -> Rdata:
"""Build an rdata object from text format.
"already exists."
-def register_type(implementation, rdtype, rdtype_text, is_singleton=False,
- rdclass=dns.rdataclass.IN):
+def register_type(implementation: Any, rdtype: int, rdtype_text: str, is_singleton: bool=False,
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN):
"""Dynamically register a module to handle an rdatatype.
*implementation*, a module implementing the type in the usual dnspython
it applies to all classes.
"""
- existing_cls = get_rdata_class(rdclass, rdtype)
- if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype):
- raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
+ the_rdtype = dns.rdatatype.RdataType.make(rdtype)
+ existing_cls = get_rdata_class(rdclass, the_rdtype)
+ if existing_cls != GenericRdata or dns.rdatatype.is_metatype(the_rdtype):
+ raise RdatatypeExists(rdclass=rdclass, rdtype=the_rdtype)
try:
- if dns.rdatatype.RdataType(rdtype).name != rdtype_text:
- raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
+ if dns.rdatatype.RdataType(the_rdtype).name != rdtype_text:
+ raise RdatatypeExists(rdclass=rdclass, rdtype=the_rdtype)
except ValueError:
pass
- _rdata_classes[(rdclass, rdtype)] = getattr(implementation,
- rdtype_text.replace('-', '_'))
- dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)
+ _rdata_classes[(rdclass, the_rdtype)] = getattr(implementation,
+ rdtype_text.replace('-', '_'))
+ dns.rdatatype.register_type(the_rdtype, rdtype_text, is_singleton)
"""A DNS class is unknown."""
-def from_text(text):
+def from_text(text: str) -> RdataClass:
"""Convert text into a DNS rdata class value.
The input text can be a defined DNS RR class mnemonic or
Raises ``ValueError`` if the rdata class value is not >= 0 and <= 65535.
- Returns an ``int``.
+ Returns a ``dns.rdataclass.RdataClass``.
"""
return RdataClass.from_text(text)
-def to_text(value):
+def to_text(value: RdataClass) -> str:
"""Convert a DNS rdata class value to text.
If the value has a known mnemonic, it will be used, otherwise the
return RdataClass.to_text(value)
-def is_metaclass(rdclass):
+def is_metaclass(rdclass: RdataClass) -> bool:
"""True if the specified class is a metaclass.
The currently defined metaclasses are ANY and NONE.
- *rdclass* is an ``int``.
+ *rdclass* is a ``dns.rdataclass.RdataClass``.
"""
if rdclass in _metaclasses:
def __init__(self, rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- covers=dns.rdatatype.NONE, ttl=0):
+ covers=dns.rdatatype.NONE, ttl: int=0):
"""Create a new rdataset of the specified class and type.
*rdclass*, a ``dns.rdataclass.RdataClass``, the rdataclass.
obj.ttl = self.ttl
return obj
- def update_ttl(self, ttl: int):
+ def update_ttl(self, ttl: int) -> None:
"""Perform TTL minimization.
Set the TTL of the rdataset to be the lesser of the set's current
elif ttl < self.ttl:
self.ttl = ttl
- def add(self, rd, ttl: Optional[int]=None): # pylint: disable=arguments-differ
+ def add(self, rd: dns.rdata.Rdata, ttl: Optional[int]=None) -> None: # pylint: disable=arguments-differ
"""Add the specified rdata to the rdataset.
If the optional *ttl* parameter is supplied, then
def to_text(self, name: Optional[dns.name.Name]=None,
origin: Optional[dns.name.Name]=None,
- relativize=True,
+ relativize: bool=True,
override_rdclass: Optional[dns.rdataclass.RdataClass]=None,
want_comments=False, **kw) -> str:
"""Convert the rdataset into DNS zone file format.
compress: Optional[dns.name.CompressType]=None,
origin: Optional[dns.name.Name]=None,
override_rdclass: Optional[dns.rdataclass.RdataClass]=None,
- want_shuffle=True) -> int:
+ want_shuffle: bool=True) -> int:
"""Convert the rdataset to wire format.
*name*, a ``dns.name.Name`` is the owner name to use.
ttl: int, text_rdatas: Collection[str],
idna_codec: Optional[dns.name.IDNACodec]=None,
origin: Optional[dns.name.Name]=None,
- relativize=True, relativize_to: Optional[dns.name.Name]=None) -> Rdataset:
+ relativize: bool=True, relativize_to: Optional[dns.name.Name]=None) -> Rdataset:
"""Create an rdataset with the specified class, type, and TTL, and with
the specified list of rdatas in text format.
"""DNS resource record type is unknown."""
-def from_text(text):
+def from_text(text: str) -> RdataType:
"""Convert text into a DNS rdata type value.
The input text can be a defined DNS RR type mnemonic or
Raises ``ValueError`` if the rdata type value is not >= 0 and <= 65535.
- Returns an ``int``.
+ Returns a ``dns.rdatatype.RdataType``.
"""
text = text.upper().replace('-', '_')
raise
-def to_text(value):
+def to_text(value: RdataType) -> str:
"""Convert a DNS rdata type value to text.
If the value has a known mnemonic, it will be used, otherwise the
return text.replace('_', '-')
-def is_metatype(rdtype):
+def is_metatype(rdtype: RdataType) -> bool:
"""True if the specified type is a metatype.
- *rdtype* is an ``int``.
+ *rdtype* is a ``dns.rdatatype.RdataType``.
The currently defined metatypes are TKEY, TSIG, IXFR, AXFR, MAILA,
MAILB, ANY, and OPT.
return (256 > rdtype >= 128) or rdtype in _metatypes
-def is_singleton(rdtype):
+def is_singleton(rdtype: RdataType) -> bool:
"""Is the specified type a singleton type?
Singleton types can only have a single rdata in an rdataset, or a single
return False
# pylint: disable=redefined-outer-name
-def register_type(rdtype, rdtype_text, is_singleton=False):
+def register_type(rdtype: RdataType, rdtype_text: str, is_singleton: bool=False):
"""Dynamically register an rdatatype.
- *rdtype*, an ``int``, the rdatatype to register.
+ *rdtype*, a ``dns.rdatatype.RdataType``, the rdatatype to register.
*rdtype_text*, a ``str``, the textual form of the rdatatype.
"""DNS stub resolver."""
-from typing import Dict, List, Optional, Tuple, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
import contextlib
import dummy_threading as _threading # type: ignore
import dns.exception
+import dns.edns
import dns.flags
import dns.inet
import dns.ipv4
"""The DNS query name is too long after DNAME substitution."""
-ErrorTuple = Tuple[str, bool, int, Exception, dns.message.Message]
+ErrorTuple = Tuple[Optional[str], bool, int, Union[Exception, str], Optional[dns.message.Message]]
def _errors_to_text(errors: List[ErrorTuple]) -> List[str]:
self.lock = _threading.Lock()
self.statistics = CacheStatistics()
- def reset_statistics(self):
+ def reset_statistics(self) -> None:
"""Reset all statistics to zero."""
with self.lock:
self.statistics.reset()
- def hits(self):
+ def hits(self) -> int:
"""How many hits has the cache had?"""
with self.lock:
return self.statistics.hits
- def misses(self):
+ def misses(self) -> int:
"""How many misses has the cache had?"""
with self.lock:
return self.statistics.misses
class Cache(CacheBase):
"""Simple thread-safe DNS answer cache."""
- def __init__(self, cleaning_interval=300.0):
+ def __init__(self, cleaning_interval: float=300.0):
"""*cleaning_interval*, a ``float`` is the number of seconds between
periodic cleanings.
"""
super().__init__()
- self.data = {}
+ self.data: Dict[CacheKey, Answer] = {}
self.cleaning_interval = cleaning_interval
- self.next_cleaning = time.time() + self.cleaning_interval
+ self.next_cleaning: float = time.time() + self.cleaning_interval
- def _maybe_clean(self):
+ def _maybe_clean(self) -> None:
"""Clean the cache if it's time to do so."""
now = time.time()
self.statistics.hits += 1
return v
- def put(self, key: CacheKey, value: Answer):
+ def put(self, key: CacheKey, value: Answer) -> None:
"""Associate key and value in the cache.
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` tuple whose values are the
self._maybe_clean()
self.data[key] = value
- def flush(self, key: Optional[CacheKey]=None):
+ def flush(self, key: Optional[CacheKey]=None) -> None:
"""Flush the cache.
If *key* is not ``None``, only that item is flushed. Otherwise
for a new one.
"""
- def __init__(self, max_size=100000):
+ def __init__(self, max_size: int=100000):
"""*max_size*, an ``int``, is the maximum number of nodes to cache;
it must be greater than 0.
"""
super().__init__()
- self.data = {}
+ self.data: Dict[CacheKey, LRUCacheNode] = {}
self.set_max_size(max_size)
- self.sentinel = LRUCacheNode(None, None)
+ self.sentinel: LRUCacheNode = LRUCacheNode(None, None)
self.sentinel.prev = self.sentinel
self.sentinel.next = self.sentinel
- def set_max_size(self, max_size):
+ def set_max_size(self, max_size: int) -> None:
if max_size < 1:
max_size = 1
self.max_size = max_size
else:
return node.hits
- def put(self, key: CacheKey, value: Answer):
+ def put(self, key: CacheKey, value: Answer) -> None:
"""Associate key and value in the cache.
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` tuple whose values are the
node.unlink()
del self.data[node.key]
while len(self.data) >= self.max_size:
- node = self.sentinel.prev
- node.unlink()
- del self.data[node.key]
+ gnode = self.sentinel.prev
+ gnode.unlink()
+ del self.data[gnode.key]
node = LRUCacheNode(key, value)
node.link_after(self.sentinel)
self.data[key] = node
- def flush(self, key: Optional[CacheKey]=None):
+ def flush(self, key: Optional[CacheKey]=None) -> None:
"""Flush the cache.
If *key* is not ``None``, only that item is flushed. Otherwise
node.unlink()
del self.data[node.key]
else:
- node = self.sentinel.next
- while node != self.sentinel:
- next = node.next
- node.unlink()
- node = next
+ gnode = self.sentinel.next
+ while gnode != self.sentinel:
+ next = gnode.next
+ gnode.unlink()
+ gnode = next
self.data = {}
class _Resolution:
tcp: bool, raise_on_no_answer: bool, search: Optional[bool]):
if isinstance(qname, str):
qname = dns.name.from_text(qname, None)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- if dns.rdatatype.is_metatype(rdtype):
+ the_rdtype = dns.rdatatype.RdataType.make(rdtype)
+ if dns.rdatatype.is_metatype(the_rdtype):
raise NoMetaqueries
- rdclass = dns.rdataclass.RdataClass.make(rdclass)
- if dns.rdataclass.is_metaclass(rdclass):
+ the_rdclass = dns.rdataclass.RdataClass.make(rdclass)
+ if dns.rdataclass.is_metaclass(the_rdclass):
raise NoMetaqueries
self.resolver = resolver
self.qnames_to_try = resolver._get_qnames_to_try(qname, search)
self.qnames = self.qnames_to_try[:]
- self.rdtype = rdtype
- self.rdclass = rdclass
+ self.rdtype = the_rdtype
+ self.rdclass = the_rdclass
self.tcp = tcp
self.raise_on_no_answer = raise_on_no_answer
- self.nxdomain_responses: Dict[dns.name.Name, Answer] = {}
+ self.nxdomain_responses: Dict[dns.name.Name, dns.message.QueryMessage] = {}
# Initialize other things to help analysis tools
self.qname = dns.name.empty
self.nameservers: List[str] = []
raise NXDOMAIN(qnames=self.qnames_to_try,
responses=self.nxdomain_responses)
- def next_nameserver(self):
+ def next_nameserver(self) -> Tuple[str, int, bool, float]:
if self.retry_with_tcp:
assert self.nameserver is not None
self.tcp_attempt = True
self.retry_with_tcp = False
return (self.nameserver, self.port, True, 0)
- backoff = 0
+ backoff = 0.0
if not self.current_nameservers:
if len(self.nameservers) == 0:
# Out of things to try!
self.tcp_attempt = self.tcp
return (self.nameserver, self.port, self.tcp_attempt, backoff)
- def query_result(self, response, ex):
+ def query_result(self, response: Optional[dns.message.Message],
+ ex: Optional[Exception]) -> Tuple[Optional[Answer], bool]:
#
# returns an (answer: Answer, end_loop: bool) tuple.
#
+ assert self.nameserver is not None
if ex:
# Exception during I/O or from_wire()
assert response is None
return (None, False)
# We got an answer!
assert response is not None
+ assert isinstance(response, dns.message.QueryMessage)
rcode = response.rcode()
if rcode == dns.rcode.NOERROR:
try:
#
# pylint: disable=attribute-defined-outside-init
- def __init__(self, filename='/etc/resolv.conf', configure=True):
+ def __init__(self, filename: str='/etc/resolv.conf', configure: bool=True):
"""*filename*, a ``str`` or file object, specifying a file
in standard /etc/resolv.conf format. This parameter is meaningful
only when *configure* is true and the platform is POSIX.
self.rotate = False
self.ndots: Optional[int] = None
- def read_resolv_conf(self, f):
+ def read_resolv_conf(self, f: Any) -> None:
"""Process *f* as a file in the /etc/resolv.conf format. If f is
a ``str``, it is used as the name of the file to open; otherwise it
is treated as the file itself.
if len(self.nameservers) == 0:
raise NoResolverConfiguration('no nameservers')
- def read_registry(self):
+ def read_registry(self) -> None:
"""Extract resolver configuration from the Windows registry."""
try:
- info = dns.win32util.get_dns_info()
+ info = dns.win32util.get_dns_info() # type: ignore
if info.domain is not None:
self.domain = info.domain
self.nameservers = info.nameservers
qnames_to_try.append(abs_qname)
return qnames_to_try
- def use_tsig(self, keyring, keyname=None,
- algorithm=dns.tsig.default_algorithm):
+ def use_tsig(self, keyring: Any, keyname: Optional[Union[dns.name.Name, str]]=None,
+ algorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm) -> None:
"""Add a TSIG signature to each query.
The parameters are passed to ``dns.message.Message.use_tsig()``;
self.keyname = keyname
self.keyalgorithm = algorithm
- def use_edns(self, edns=0, ednsflags=0,
- payload=dns.message.DEFAULT_EDNS_PAYLOAD, options=None):
+ def use_edns(self, edns: Optional[Union[int, bool]]=0, ednsflags: int=0,
+ payload: int=dns.message.DEFAULT_EDNS_PAYLOAD,
+ options: Optional[List[dns.edns.Option]]=None) -> None:
"""Configure EDNS behavior.
*edns*, an ``int``, is the EDNS level to use. Specifying
self.payload = payload
self.ednsoptions = options
- def set_flags(self, flags: int):
+ def set_flags(self, flags: int) -> None:
"""Overrides the default flags with your own.
*flags*, an ``int``, the message flags to use.
def resolve(self, qname: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.A,
rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
- tcp=False, source: Optional[str]=None, raise_on_no_answer=True, source_port=0,
+ tcp: bool=False, source: Optional[str]=None, raise_on_no_answer: bool=True, source_port: int=0,
lifetime: Optional[float]=None, search: Optional[bool]=None) -> Answer: # pylint: disable=arguments-differ
"""Query nameservers to find the answer to the question.
def query(self, qname: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.A,
rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
- tcp=False, source: Optional[str]=None, raise_on_no_answer=True, source_port=0,
+ tcp: bool=False, source: Optional[str]=None, raise_on_no_answer: bool=True, source_port: int=0,
lifetime: Optional[float]=None) -> Answer: # pragma: no cover
"""Query nameservers to find the answer to the question.
def resolve(qname: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.A,
rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
- tcp=False, source: Optional[str]=None, raise_on_no_answer=True, source_port=0,
+ tcp: bool=False, source: Optional[str]=None, raise_on_no_answer: bool=True, source_port: int=0,
lifetime: Optional[float]=None, search: Optional[bool]=None) -> Answer: # pragma: no cover
"""Query nameservers to find the answer to the question.
def query(qname: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.A,
rdclass: Union[dns.rdataclass.RdataClass, str]=dns.rdataclass.IN,
- tcp=False, source: Optional[str]=None, raise_on_no_answer=True, source_port=0,
+ tcp: bool=False, source: Optional[str]=None, raise_on_no_answer: bool=True, source_port: int=0,
lifetime: Optional[float]=None) -> Answer: # pragma: no cover
"""Query nameservers to find the answer to the question.
def zone_for_name(name: Union[dns.name.Name, str], rdclass=dns.rdataclass.IN,
- tcp=False, resolver: Optional[Resolver]=None,
+ tcp: bool=False, resolver: Optional[Resolver]=None,
lifetime: Optional[float]=None) -> dns.name.Name:
"""Find the name of the zone which contains the specified name.
ipv6_reverse_domain = dns.name.from_text('ip6.arpa.')
-def from_address(text: str, v4_origin=ipv4_reverse_domain,
- v6_origin=ipv6_reverse_domain) -> dns.name.Name:
+def from_address(text: str, v4_origin: dns.name.Name=ipv4_reverse_domain,
+ v6_origin: dns.name.Name=ipv6_reverse_domain) -> dns.name.Name:
"""Convert an IPv4 or IPv6 address in textual form into a Name object whose
value is the reverse-map domain name of the address.
return dns.name.from_text('.'.join(reversed(parts)), origin=origin)
-def to_address(name: dns.name.Name, v4_origin=ipv4_reverse_domain,
- v6_origin=ipv6_reverse_domain) -> str:
+def to_address(name: dns.name.Name, v4_origin: dns.name.Name=ipv4_reverse_domain,
+ v6_origin: dns.name.Name=ipv6_reverse_domain) -> str:
"""Convert a reverse map domain name into textual address form.
*name*, a ``dns.name.Name``, an IPv4 or IPv6 address in reverse-map name
"""DNS RRsets (an RRset is a named rdataset)"""
-from typing import cast, Collection, Optional, Union
+from typing import Any, cast, Collection, Optional, Union
import dns.name
import dns.rdataset
# pylint: disable=arguments-differ
- def to_text(self, origin: Optional[dns.name.Name]=None, relativize=True, **kw) -> str: # type: ignore
+ def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw) -> str: # type: ignore
"""Convert the RRset into DNS zone file format.
See ``dns.name.Name.choose_relativity`` for more information
return super().to_text(self.name, origin, relativize,
self.deleting, **kw)
- def to_wire(self, file, compress: Optional[dns.name.CompressType]=None, # type: ignore
+ def to_wire(self, file: Any, compress: Optional[dns.name.CompressType]=None, # type: ignore
origin: Optional[dns.name.Name]=None, **kw) -> int:
"""Convert the RRset to wire format.
rdtype: Union[dns.rdatatype.RdataType, str],
text_rdatas: Collection[str],
idna_codec: Optional[dns.name.IDNACodec]=None,
- origin: Optional[dns.name.Name]=None, relativize=True,
+ origin: Optional[dns.name.Name]=None, relativize: bool=True,
relativize_to: Optional[dns.name.Name]=None) -> RRset:
"""Create an RRset with the specified name, TTL, class, and type, and with
the specified list of rdatas in text format.
cast(Collection[str], text_rdatas))
-def from_rdata_list(name: Union[dns.name.Name, str], ttl:int,
+def from_rdata_list(name: Union[dns.name.Name, str], ttl: int,
rdatas: Collection[dns.rdata.Rdata],
idna_codec: Optional[dns.name.IDNACodec]=None) -> RRset:
"""Create an RRset with the specified name and TTL, and with
"""Serial Number Arthimetic from RFC 1982"""
class Serial:
- def __init__(self, value:int , bits=32):
+ def __init__(self, value: int, bits: int=32):
self.value = value % 2 ** bits
self.bits = bits
"""Tokenize DNS zone file format"""
-from typing import Optional, List, Tuple
+from typing import Any, Optional, List, Tuple, Union
import io
import sys
has_escape: Does the token value contain escapes?
"""
- def __init__(self, ttype: int, value='', has_escape=False, comment: Optional[str]=None):
+ def __init__(self, ttype: int, value: Any='', has_escape: bool=False,
+ comment: Optional[str]=None):
"""Initialize a token instance."""
self.ttype = ttype
encoder/decoder is used.
"""
- def __init__(self, f=sys.stdin, filename: Optional[str]=None,
+ def __init__(self, f: Any=sys.stdin, filename: Optional[str]=None,
idna_codec: Optional[dns.name.IDNACodec]=None):
"""Initialize a tokenizer instance.
return (self.filename, self.line_number)
- def _unget_char(self, c):
+ def _unget_char(self, c: str) -> None:
"""Unget a character.
The unget buffer for characters is only one character large; it is
raise UngetBufferFull # pragma: no cover
self.ungotten_char = c
- def skip_whitespace(self):
+ def skip_whitespace(self) -> int:
"""Consume input until a non-whitespace character is encountered.
The non-whitespace character is then ungotten, and the number of
return skipped
skipped += 1
- def get(self, want_leading=False, want_comment=False) -> Token:
+ def get(self, want_leading: bool=False, want_comment: bool=False) -> Token:
"""Get the next token.
want_leading: If True, return a WHITESPACE token if the
# Helpers
- def get_int(self, base=10):
+ def get_int(self, base: int=10) -> int:
"""Read the next token and interpret it as an unsigned integer.
Raises dns.exception.SyntaxError if not an unsigned integer.
'%d is not an unsigned 8-bit integer' % value)
return value
- def get_uint16(self, base=10) -> int:
+ def get_uint16(self, base: int=10) -> int:
"""Read the next token and interpret it as a 16-bit unsigned
integer.
'%d is not an unsigned 16-bit integer' % value)
return value
- def get_uint32(self, base=10) -> int:
+ def get_uint32(self, base: int=10) -> int:
"""Read the next token and interpret it as a 32-bit unsigned
integer.
'%d is not an unsigned 32-bit integer' % value)
return value
- def get_uint48(self, base=10) -> int:
+ def get_uint48(self, base: int=10) -> int:
"""Read the next token and interpret it as a 48-bit unsigned
integer.
'%d is not an unsigned 48-bit integer' % value)
return value
- def get_string(self, max_length=None) -> str:
+ def get_string(self, max_length: Optional[int]=None) -> str:
"""Read the next token and interpret it as a string.
Raises dns.exception.SyntaxError if not a string.
raise dns.exception.SyntaxError('expecting an identifier')
return token.value
- def get_remaining(self, max_tokens=None) -> List[Token]:
+ def get_remaining(self, max_tokens: Optional[int]=None) -> List[Token]:
"""Return the remaining tokens on the line, until an EOL or EOF is seen.
max_tokens: If not None, stop after this number of tokens.
break
return tokens
- def concatenate_remaining_identifiers(self, allow_empty=False) -> str:
+ def concatenate_remaining_identifiers(self, allow_empty: bool=False) -> str:
"""Read the remaining tokens on the line, which should be identifiers.
Raises dns.exception.SyntaxError if there are no remaining tokens,
return s
def as_name(self, token: Token, origin: Optional[dns.name.Name]=None,
- relativize=False, relativize_to: Optional[dns.name.Name]=None) -> dns.name.Name:
+ relativize: bool=False, relativize_to: Optional[dns.name.Name]=None) -> dns.name.Name:
"""Try to interpret the token as a DNS name.
Raises dns.exception.SyntaxError if not a name.
name = dns.name.from_text(token.value, origin, self.idna_codec)
return name.choose_relativity(relativize_to or origin, relativize)
- def get_name(self, origin: Optional[dns.name.Name]=None, relativize=False,
+ def get_name(self, origin: Optional[dns.name.Name]=None, relativize: bool=False,
relativize_to: Optional[dns.name.Name]=None) -> dns.name.Name:
"""Read the next token and interpret it as a DNS name.
"""Begin a read-only transaction."""
raise NotImplementedError # pragma: no cover
- def writer(self, replacement=False) -> 'Transaction':
+ def writer(self, replacement: bool=False) -> 'Transaction':
"""Begin a writable transaction.
*replacement*, a ``bool``. If `True`, the content of the
class Transaction:
- def __init__(self, manager: TransactionManager, replacement=False, read_only=False):
+ def __init__(self, manager: TransactionManager, replacement: bool=False, read_only: bool=False):
self.manager = manager
self.replacement = replacement
self.read_only = read_only
rdataset = self._get_rdataset(name, rdtype, covers)
return _ensure_immutable_rdataset(rdataset)
- def get_node(self, name) -> dns.node.Node:
+ def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]:
"""Return the node at *name*, if any.
Returns an immutable node or ``None``.
"""
return _ensure_immutable_node(self._get_node(name))
- def _check_read_only(self):
+ def _check_read_only(self) -> None:
if self.read_only:
raise ReadOnly
- def add(self, *args):
+ def add(self, *args) -> None:
"""Add records.
The arguments may be:
"""
self._check_ended()
self._check_read_only()
- return self._add(False, args)
+ self._add(False, args)
- def replace(self, *args):
+ def replace(self, *args) -> None:
"""Replace the existing rdataset at the name with the specified
rdataset, or add the specified rdataset if there was no existing
rdataset.
"""
self._check_ended()
self._check_read_only()
- return self._add(True, args)
+ self._add(True, args)
- def delete(self, *args):
+ def delete(self, *args) -> None:
"""Delete records.
It is not an error if some of the records are not in the existing
"""
self._check_ended()
self._check_read_only()
- return self._delete(False, args)
+ self._delete(False, args)
- def delete_exact(self, *args):
+ def delete_exact(self, *args) -> None:
"""Delete records.
The arguments may be:
"""
self._check_ended()
self._check_read_only()
- return self._delete(True, args)
+ self._delete(True, args)
def name_exists(self, name: Union[dns.name.Name, str]) -> bool:
"""Does the specified name exist?"""
name = dns.name.from_text(name, None)
return self._name_exists(name)
- def update_serial(self, value=1, relative=True, name=dns.name.empty):
+ def update_serial(self, value: int=1, relative: bool=True, name: dns.name.Name=dns.name.empty) -> None:
"""Update the serial number.
*value*, an `int`, is an increment if *relative* is `True`, or the
self._check_ended()
return self._changed()
- def commit(self):
+ def commit(self) -> None:
"""Commit the transaction.
Normally transactions are used as context managers and commit
"""
self._end(True)
- def rollback(self):
+ def rollback(self) -> None:
"""Rollback the transaction.
Normally transactions are used as context managers and commit
"""
self._end(False)
- def check_put_rdataset(self, check: CheckPutRdatasetType):
+ def check_put_rdataset(self, check: CheckPutRdatasetType) -> None:
"""Call *check* before putting (storing) an rdataset.
The function is called with the transaction, the name, and the rdataset.
"""
self._check_put_rdataset.append(check)
- def check_delete_rdataset(self, check: CheckDeleteRdatasetType):
+ def check_delete_rdataset(self, check: CheckDeleteRdatasetType) -> None:
"""Call *check* before deleting an rdataset.
The function is called with the transaction, the name, the rdatatype,
"""DNS Dynamic Update Support"""
-from typing import Any, Optional, Union
+from typing import Any, List, Optional, Union
import dns.message
import dns.name
import dns.opcode
import dns.rdata
import dns.rdataclass
+import dns.rdatatype
import dns.rdataset
import dns.tsig
def __init__(self, zone: Optional[Union[dns.name.Name, str]]=None,
rdclass=dns.rdataclass.IN,
keyring: Optional[Any]=None, keyname: Optional[dns.name.Name]=None,
- keyalgorithm=dns.tsig.default_algorithm,
+ keyalgorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm,
id: Optional[int]=None):
"""Initialize a new DNS Update object.
self.use_tsig(keyring, keyname, algorithm=keyalgorithm)
@property
- def zone(self):
+ def zone(self) -> List[dns.rrset.RRset]:
"""The zone section."""
return self.sections[0]
self.sections[0] = v
@property
- def prerequisite(self):
+ def prerequisite(self) -> List[dns.rrset.RRset]:
"""The prerequisite section."""
return self.sections[1]
self.sections[1] = v
@property
- def update(self):
+ def update(self) -> List[dns.rrset.RRset]:
"""The update section."""
return self.sections[2]
self.origin)
self._add_rr(name, ttl, rd, section=section)
- def add(self, name: Union[dns.name.Name, str], *args):
+ def add(self, name: Union[dns.name.Name, str], *args) -> None:
"""Add records.
The first argument is always a name. The other
self._add(False, self.update, name, *args)
- def delete(self, name: Union[dns.name.Name, str], *args):
+ def delete(self, name: Union[dns.name.Name, str], *args) -> None:
"""Delete records.
The first argument is always a name. The other
self.origin)
self._add_rr(name, 0, rd, dns.rdataclass.NONE)
- def replace(self, name: Union[dns.name.Name, str], *args):
+ def replace(self, name: Union[dns.name.Name, str], *args) -> None:
"""Replace records.
The first argument is always a name. The other
self._add(True, self.update, name, *args)
- def present(self, name: Union[dns.name.Name, str], *args):
+ def present(self, name: Union[dns.name.Name, str], *args) -> None:
"""Require that an owner name (and optionally an rdata type,
or specific rdataset) exists as a prerequisite to the
execution of the update.
dns.rdatatype.NONE, None,
True, True)
- def absent(self, name: Union[dns.name.Name, str], rdtype=None):
+ def absent(self, name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str]=None) -> None:
"""Require that an owner name (and optionally an rdata type) does
not exist as a prerequisite to the execution of the update."""
dns.rdatatype.NONE, None,
True, True)
else:
- rdtype = dns.rdatatype.RdataType.make(rdtype)
+ the_rdtype = dns.rdatatype.RdataType.make(rdtype)
self.find_rrset(self.prerequisite, name,
- dns.rdataclass.NONE, rdtype,
+ dns.rdataclass.NONE, the_rdtype,
dns.rdatatype.NONE, None,
True, True)
import dns.exception
import dns.immutable
import dns.name
+import dns.node
import dns.rdataclass
+import dns.rdataset
import dns.rdatatype
import dns.rdtypes.ANY.SOA
import dns.zone
node_factory = Node
- def __init__(self, origin: Optional[Union[dns.name.Name, str]], rdclass=dns.rdataclass.IN, relativize=True,
+ def __init__(self, origin: Optional[Union[dns.name.Name, str]],
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN, relativize: bool=True,
pruning_policy: Optional[Callable[['Zone', Version], Optional[bool]]]=None):
"""Initialize a versioned zone object.
self._readers.add(txn)
return txn
- def writer(self, replacement=False) -> Transaction:
+ def writer(self, replacement: bool=False) -> Transaction:
event = None
while True:
with self._version_lock:
self._pruning_policy(self, self._versions[0]):
self._versions.popleft()
- def set_max_versions(self, max_versions: Optional[int]):
+ def set_max_versions(self, max_versions: Optional[int]) -> None:
"""Set a pruning policy that retains up to the specified number
of versions
"""
return len(zone._versions) > max_versions
self.set_pruning_policy(policy)
- def set_pruning_policy(self, policy: Optional[Callable[['Zone', Version], Optional[bool]]]):
+ def set_pruning_policy(self, policy: Optional[Callable[['Zone', Version], Optional[bool]]]) -> None:
"""Set the pruning policy for the zone.
The *policy* function takes a `Version` and returns `True` if
id = 1
return id
- def find_node(self, name, create=False):
+ def find_node(self, name: Union[dns.name.Name, str], create: bool=False) -> dns.node.Node:
if create:
raise UseTransaction
return super().find_node(name)
- def delete_node(self, name):
+ def delete_node(self, name: Union[dns.name.Name, str]) -> None:
raise UseTransaction
- def find_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,
- create=False):
+ def find_rdataset(self, name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE,
+ create: bool=False) -> dns.rdataset.Rdataset:
if create:
raise UseTransaction
rdataset = super().find_rdataset(name, rdtype, covers)
return dns.rdataset.ImmutableRdataset(rdataset)
- def get_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,
- create=False):
+ def get_rdataset(self, name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE,
+ create: bool=False) -> Optional[dns.rdataset.Rdataset]:
if create:
raise UseTransaction
rdataset = super().get_rdataset(name, rdtype, covers)
- return dns.rdataset.ImmutableRdataset(rdataset)
+ if rdataset is not None:
+ return dns.rdataset.ImmutableRdataset(rdataset)
+ else:
+ return None
- def delete_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE):
+ def delete_rdataset(self, name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE) -> None:
raise UseTransaction
- def replace_rdataset(self, name, replacement):
+ def replace_rdataset(self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset) -> None:
raise UseTransaction
import dns.name
class Parser:
- def __init__(self, wire: bytes, current=0):
+ def __init__(self, wire: bytes, current: int=0):
self.wire = wire
self.current = 0
self.end = len(self.wire)
self.seek(current)
self.furthest = current
- def remaining(self):
+ def remaining(self) -> int:
return self.end - self.current
- def get_bytes(self, size=int) -> bytes:
+ def get_bytes(self, size: int) -> bytes:
assert size >= 0
if size > self.remaining():
raise dns.exception.FormError
self.furthest = max(self.furthest, self.current)
return output
- def get_counted_bytes(self, length_size=1) -> bytes:
+ def get_counted_bytes(self, length_size: int=1) -> bytes:
length = int.from_bytes(self.get_bytes(length_size), 'big')
return self.get_bytes(length)
name = name.relativize(origin)
return name
- def seek(self, where: int):
+ def seek(self, where: int) -> None:
# Note that seeking to the end is OK! (If you try to read
# after such a seek, you'll get an exception as expected.)
if where < 0 or where > self.end:
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-from typing import Any, List, Optional, Tuple
+from typing import Any, List, Optional, Tuple, Union
import dns.exception
import dns.message
State machine for zone transfers.
"""
- def __init__(self, txn_manager: dns.transaction.TransactionManager, rdtype=dns.rdatatype.AXFR,
- serial: Optional[int]=None, is_udp=False):
+ def __init__(self, txn_manager: dns.transaction.TransactionManager,
+ rdtype: dns.rdatatype.RdataType=dns.rdatatype.AXFR,
+ serial: Optional[int]=None, is_udp: bool=False):
"""Initialize an inbound zone transfer.
*txn_manager* is a :py:class:`dns.transaction.TransactionManager`.
def make_query(txn_manager: dns.transaction.TransactionManager, serial: Optional[int]=0,
- use_edns=None, ednsflags: Optional[int]=None, payload: Optional[int]=None,
+ use_edns: Optional[Union[int, bool]]=None, ednsflags: Optional[int]=None, payload: Optional[int]=None,
request_payload: Optional[int]=None, options: Optional[List[dns.edns.Option]]=None,
keyring: Any=None, keyname: Optional[dns.name.Name]=None,
- keyalgorithm=dns.tsig.default_algorithm) -> Tuple[dns.message.QueryMessage, Optional[int]]:
+ keyalgorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm) \
+ -> Tuple[dns.message.QueryMessage, Optional[int]]:
"""Make an AXFR or IXFR query.
*txn_manager* is a ``dns.transaction.TransactionManager``, typically a
__slots__ = ['rdclass', 'origin', 'nodes', 'relativize']
def __init__(self, origin: Optional[Union[dns.name.Name, str]],
- rdclass=dns.rdataclass.IN, relativize=True):
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN, relativize: bool=True):
"""Initialize a zone object.
*origin* is the origin of the zone. It may be a ``dns.name.Name``,
key = self._validate_name(key)
return key in self.nodes
- def find_node(self, name: Union[dns.name.Name, str], create=False):
+ def find_node(self, name: Union[dns.name.Name, str], create: bool=False) -> dns.node.Node:
"""Find a node in the zone, possibly creating it.
*name*: the name of the node to find.
self.nodes[name] = node
return node
- def get_node(self, name: Union[dns.name.Name, str], create=False):
+ def get_node(self, name: Union[dns.name.Name, str], create: bool=False) -> Optional[dns.node.Node]:
"""Get a node in the zone, possibly creating it.
This method is like ``find_node()``, except it returns None instead
node = None
return node
- def delete_node(self, name: Union[dns.name.Name, str]):
+ def delete_node(self, name: Union[dns.name.Name, str]) -> None:
"""Delete the specified node if it exists.
*name*: the name of the node to find.
def find_rdataset(self, name: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str],
covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE,
- create=False) -> dns.rdataset.Rdataset:
+ create: bool=False) -> dns.rdataset.Rdataset:
"""Look for an rdataset with the specified name and type in the zone,
and return an rdataset encapsulating it.
Returns a ``dns.rdataset.Rdataset``.
"""
- name = self._validate_name(name)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- node = self.find_node(name, create)
- return node.find_rdataset(self.rdclass, rdtype, covers, create)
+ the_name = self._validate_name(name)
+ the_rdtype = dns.rdatatype.RdataType.make(rdtype)
+ the_covers = dns.rdatatype.RdataType.make(covers)
+ node = self.find_node(the_name, create)
+ return node.find_rdataset(self.rdclass, the_rdtype, the_covers, create)
- def get_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,
- create=False) -> Optional[dns.rdataset.Rdataset]:
+ def get_rdataset(self, name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE,
+ create: bool=False) -> Optional[dns.rdataset.Rdataset]:
"""Look for an rdataset with the specified name and type in the zone.
This method is like ``find_rdataset()``, except it returns None instead
def delete_rdataset(self, name: Union[dns.name.Name, str],
rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE):
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE) -> None:
"""Delete the rdataset matching *rdtype* and *covers*, if it
exists at the node specified by *name*.
RRSIG rdataset.
"""
- name = self._validate_name(name)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- node = self.get_node(name)
+ the_name = self._validate_name(name)
+ the_rdtype = dns.rdatatype.RdataType.make(rdtype)
+ the_covers = dns.rdatatype.RdataType.make(covers)
+ node = self.get_node(the_name)
if node is not None:
- node.delete_rdataset(self.rdclass, rdtype, covers)
+ node.delete_rdataset(self.rdclass, the_rdtype, the_covers)
if len(node) == 0:
- self.delete_node(name)
+ self.delete_node(the_name)
def replace_rdataset(self, name: Union[dns.name.Name, str],
- replacement: dns.rdataset.Rdataset):
+ replacement: dns.rdataset.Rdataset) -> None:
"""Replace an rdataset at name.
It is not an error if there is no rdataset matching I{replacement}.
for rdata in rds:
yield (name, rds.ttl, rdata)
- def to_file(self, f: Any, sorted=True, relativize=True, nl: Optional[str]=None,
- want_comments=False, want_origin=False):
+ def to_file(self, f: Any, sorted: bool=True, relativize: bool=True, nl: Optional[str]=None,
+ want_comments: bool=False, want_origin: bool=False):
"""Write a zone to a file.
*f*, a file or `str`. If *f* is a string, it is treated
f.write(l)
f.write(nl)
- def to_text(self, sorted=True, relativize=True, nl: Optional[str]=None,
- want_comments=False, want_origin=False):
+ def to_text(self, sorted: bool=True, relativize: bool=True, nl: Optional[str]=None,
+ want_comments: bool=False, want_origin: bool=False):
"""Return a zone's text as though it were written to a file.
*sorted*, a ``bool``. If True, the default, then the file
temp_buffer.close()
return return_value
- def check_origin(self):
+ def check_origin(self) -> None:
"""Do some simple checking of the zone's origin.
Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
if self.relativize:
name = dns.name.empty
else:
+ assert self.origin is not None
name = self.origin
if self.get_rdataset(name, dns.rdatatype.SOA) is None:
raise NoSOA
hasher.update(rrnamebuf + rrfixed + rrlen + rdata)
return hasher.digest()
- def compute_digest(self, hash_algorithm: DigestHashAlgorithm, scheme=DigestScheme.SIMPLE) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
+ def compute_digest(self, hash_algorithm: DigestHashAlgorithm,
+ scheme=DigestScheme.SIMPLE) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
serial = self.get_soa().serial
digest = self._compute_digest(hash_algorithm, scheme)
return dns.rdtypes.ANY.ZONEMD.ZONEMD(self.rdclass,
serial, scheme, hash_algorithm,
digest)
- def verify_digest(self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD]=None):
+ def verify_digest(self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD]=None) -> None:
digests: Union[dns.rdataset.Rdataset, List[dns.rdtypes.ANY.ZONEMD.ZONEMD]]
if zonemd:
digests = [zonemd]
else:
+ assert self.origin is not None
rds = self.get_rdataset(self.origin, dns.rdatatype.ZONEMD)
if rds is None:
raise NoDigest
return Transaction(self, False,
Version(self, 1, self.nodes, self.origin))
- def writer(self, replacement=False) -> 'Transaction':
+ def writer(self, replacement: bool=False) -> 'Transaction':
txn = Transaction(self, replacement)
txn._setup_version()
return txn
[dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
)
- def find_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
- create=False):
+ def find_rdataset(self, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
+ create: bool=False) -> dns.rdataset.Rdataset:
if create:
raise TypeError("immutable")
return super().find_rdataset(rdclass, rdtype, covers, False)
- def get_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE,
- create=False):
+ def get_rdataset(self, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
+ create: bool=False) -> Optional[dns.rdataset.Rdataset]:
if create:
raise TypeError("immutable")
return super().get_rdataset(rdclass, rdtype, covers, False)
- def delete_rdataset(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
+ def delete_rdataset(self, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE) -> None:
raise TypeError("immutable")
- def replace_rdataset(self, replacement):
+ def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
raise TypeError("immutable")
- def is_immutable(self):
+ def is_immutable(self) -> bool:
return True
class WritableVersion(Version):
- def __init__(self, zone: Zone, replacement=False):
+ def __init__(self, zone: Zone, replacement: bool=False):
# The zone._versions_lock must be held by our caller in a versioned
# zone.
id = zone._get_next_version_id()
else:
return node
- def delete_node(self, name: dns.name.Name):
+ def delete_node(self, name: dns.name.Name) -> None:
name = self._validate_name(name)
if name in self.nodes:
del self.nodes[name]
self.changed.add(name)
- def put_rdataset(self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset):
+ def put_rdataset(self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset) -> None:
node = self._maybe_cow(name)
node.replace_rdataset(rdataset)
def delete_rdataset(self, name: dns.name.Name, rdtype:dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType):
+ covers: dns.rdatatype.RdataType) -> None:
node = self._maybe_cow(name)
node.delete_rdataset(self.zone.rdclass, rdtype, covers)
if len(node) == 0:
def from_text(text: str, origin: Optional[Union[dns.name.Name, str]]=None,
- rdclass=dns.rdataclass.IN,
- relativize=True, zone_factory=Zone, filename: Optional[str]=None,
- allow_include=False, check_origin=True,
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN,
+ relativize: bool=True, zone_factory: Any=Zone, filename: Optional[str]=None,
+ allow_include: bool=False, check_origin: bool=True,
idna_codec: Optional[dns.name.IDNACodec]=None) -> Zone:
"""Build a zone object from a zone file format string.
def from_file(f: Any, origin: Optional[Union[dns.name.Name, str]]=None,
- rdclass=dns.rdataclass.IN,
- relativize=True, zone_factory=Zone, filename: Optional[str]=None,
- allow_include=True, check_origin=True) -> Zone:
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN,
+ relativize: bool=True, zone_factory: Any=Zone, filename: Optional[str]=None,
+ allow_include: bool=True, check_origin: bool=True) -> Zone:
"""Read a zone file and build a zone object.
*f*, a file or ``str``. If *f* is a string, it is treated
assert False # make mypy happy lgtm[py/unreachable-statement]
-def from_xfr(xfr, zone_factory=Zone, relativize=True, check_origin=True):
+def from_xfr(xfr: Any, zone_factory=Zone, relativize: bool=True, check_origin: bool=True) -> Zone:
"""Convert the output of a zone transfer generator into a zone object.
*xfr*, a generator of ``dns.message.Message`` objects, typically
Raises ``KeyError`` if there is no origin node.
+ Raises ``ValueError`` if no messages are yielded by the generator.
+
Returns a subclass of ``dns.zone.Zone``.
"""
zrds.update_ttl(rrset.ttl)
for rd in rrset:
zrds.add(rd)
+ if z is None:
+ raise ValueError('empty transfer')
if check_origin:
z.check_origin()
return z
SavedStateType = Tuple[dns.tokenizer.Tokenizer,
Optional[dns.name.Name], # current_origin
Optional[dns.name.Name], # last_name
- Optional[str], # current_file
+ Optional[Any], # current_file
int, # last_ttl
bool, # last_ttl_known
int, # default_ttl
"""Read a DNS zone file into a transaction."""
def __init__(self, tok: dns.tokenizer.Tokenizer, rdclass: dns.rdataclass.RdataClass,
- txn: dns.transaction.Transaction, allow_include=False,
- allow_directives=True, force_name: Optional[dns.name.Name]=None,
+ txn: dns.transaction.Transaction, allow_include: bool=False,
+ allow_directives: bool=True, force_name: Optional[dns.name.Name]=None,
force_ttl: Optional[int]=None,
force_rdclass: Optional[dns.rdataclass.RdataClass]=None,
force_rdtype: Optional[dns.rdatatype.RdataType]=None,
self.zone_rdclass = rdclass
self.txn = txn
self.saved_state: List[SavedStateType] = []
- self.current_file = None
+ self.current_file: Optional[Any] = None
self.allow_include = allow_include
self.allow_directives = allow_directives
self.force_name = force_name
self.txn.add(name, ttl, rd)
- def read(self):
+ def read(self) -> None:
"""Read a DNS zone file and build a zone object.
@raises dns.zone.NoSOA: No SOA RR was found at the zone origin
token = self.tok.get()
filename = token.value
token = self.tok.get()
+ new_origin: Optional[dns.name.Name]
if token.is_identifier():
- new_origin =\
- dns.name.from_text(token.value,
- self.current_origin,
- self.tok.idna_codec)
+ new_origin = dns.name.from_text(token.value, self.current_origin, self.tok.idna_codec)
self.tok.get_eol()
elif not token.is_eol_or_eof():
raise dns.exception.SyntaxError(
default_ttl: Optional[Union[int, str]]=None,
idna_codec: Optional[dns.name.IDNACodec]=None,
origin: Optional[Union[dns.name.Name, str]]=dns.name.root,
- relativize=False) -> List[dns.rrset.RRset]:
+ relativize: bool=False) -> List[dns.rrset.RRset]:
"""Read one or more rrsets from the specified text, possibly subject
to restrictions.