python -m pip install --upgrade pip
python -m pip install poetry
poetry install -E dnssec -E doh -E idna -E trio -E curio
+ - name: Typecheck
+ run: |
+ poetry run python -m mypy --disallow-incomplete-defs dns
- name: Test with pytest
run: |
poetry run pytest --cov=dns --cov-branch --cov-report=xml:coverage.xml
poetry run pytest --lf
potype:
- poetry run python -m mypy dns/*.py
+ poetry run python -m mypy --disallow-incomplete-defs dns
potypetests:
poetry run python -m mypy --check-untyped-defs examples tests
class DatagramSocket(dns._asyncbackend.DatagramSocket):
- def __init__(self, family: int, transport, protocol):
+ def __init__(self, family, transport, protocol):
super().__init__(family)
self.transport = transport
self.protocol = protocol
return set_default_backend(sniff())
-def set_default_backend(name: str):
+def set_default_backend(name: str) -> Backend:
"""Set the default backend.
It's not normally necessary to call this method, as
async def inbound_xfr(where: str, txn_manager: dns.transaction.TransactionManager,
query: Optional[dns.message.Message]=None,
port: int=53, timeout: Optional[float]=None, lifetime: Optional[float]=None,
- source: Optional[str]=None, source_port: int=0, udp_mode=UDPMode.NEVER,
+ source: Optional[str]=None, source_port: int=0, udp_mode: UDPMode=UDPMode.NEVER,
backend: Optional[dns.asyncbackend.Backend]=None) -> None:
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
"""Asynchronous DNS stub resolver."""
-from typing import Optional, Union
+from typing import Any, Dict, Optional, Union
import time
if answer is not None:
return answer
- async def resolve_address(self, ipaddr: str, *args, **kwargs) -> dns.resolver.Answer:
+ async def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> dns.resolver.Answer:
"""Use an asynchronous resolver to run a reverse query for PTR
records.
# We make a modified kwargs for type checking happiness, as otherwise
# we get a legit warning about possibly having rdtype and rdclass
# in the kwargs more than once.
- modified_kwargs = {}
+ modified_kwargs: Dict[str, Any] = {}
modified_kwargs.update(kwargs)
modified_kwargs['rdtype'] = dns.rdatatype.PTR
modified_kwargs['rdclass'] = dns.rdataclass.IN
backend)
-async def resolve_address(ipaddr: str, *args, **kwargs) -> dns.resolver.Answer:
+async def resolve_address(ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> dns.resolver.Answer:
"""Use a resolver to run a reverse query for PTR records.
See :py:func:`dns.asyncresolver.Resolver.resolve_address` for more
return cast(DS, ds)
-def _find_candidate_keys(keys, rrsig: RRSIG) -> Optional[List[DNSKEY]]:
+def _find_candidate_keys(keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]],
+ rrsig: RRSIG) -> Optional[List[DNSKEY]]:
value = keys.get(rrsig.signer)
if isinstance(value, dns.node.Node):
rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY)
class ECSOption(Option): # lgtm[py/missing-equals]
"""EDNS Client Subnet (ECS, RFC7871)"""
- def __init__(self, address: str, srclen: Optional[int]=None, scopelen=0):
+ def __init__(self, address: str, srclen: Optional[int]=None, scopelen: int=0):
"""*address*, a ``str``, is the client address information.
*srclen*, an ``int``, the source prefix length, which is the
return value
@classmethod
- def from_wire_parser(cls, otype: Union[OptionType, str], parser: 'dns.wire.Parser'):
+ def from_wire_parser(cls, otype: Union[OptionType, str], parser: 'dns.wire.Parser') -> Option:
family, src, scope = parser.get_struct('!HBB')
addrlen = int(math.ceil(src / 8.0))
prefix = parser.get_bytes(addrlen)
return value
@classmethod
- def from_wire_parser(cls, otype: Union[OptionType, str], parser) -> Option:
- code = parser.get_uint16()
+ def from_wire_parser(cls, otype: Union[OptionType, str], parser: 'dns.wire.Parser') -> Option:
+ the_code = EDECode.make(parser.get_uint16())
text = parser.get_remaining()
if text:
if text[-1] == 0: # text MAY be null-terminated
text = text[:-1]
- text = text.decode('utf8')
+ btext = text.decode('utf8')
else:
- text = None
+ btext = None
- return cls(code, text)
+ return cls(the_code, btext)
_type_to_class: Dict[OptionType, Any] = {
return self.to_text()
def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True,
- **kw) -> str:
+ **kw: Dict[str, Any]) -> str:
"""Convert the message to text.
The *origin*, *relativize*, and any other keyword
name: dns.name.Name,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- covers = dns.rdatatype.NONE,
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE,
deleting: Optional[dns.rdataclass.RdataClass]=None,
create: bool=False,
force_unique: bool=False) -> dns.rrset.RRset:
return rrset
def to_wire(self, origin: Optional[dns.name.Name]=None, max_size: int=0,
- multi: bool=False, tsig_ctx: Optional[Any]=None, **kw) -> bytes:
+ multi: bool=False, tsig_ctx: Optional[Any]=None, **kw: Dict[str, Any]) -> bytes:
"""Return a string containing the message in DNS compressed wire
format.
return self.message
-def from_wire(wire, keyring: Optional[Any]=None, request_mac: Optional[bytes]=b'',
+def from_wire(wire: bytes, keyring: Optional[Any]=None, request_mac: Optional[bytes]=b'',
xfr: bool=False, origin: Optional[dns.name.Name]=None,
tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]]=None,
multi: bool=False, question_only: bool=False, one_rr_per_rrset: bool=False,
return self.message
-def from_text(text, idna_codec: Optional[dns.name.IDNACodec]=None,
+def from_text(text: str, idna_codec: Optional[dns.name.IDNACodec]=None,
one_rr_per_rrset: bool=False, origin: Optional[dns.name.Name]=None,
relativize: bool=True, relativize_to: Optional[dns.name.Name]=None) -> Message:
"""Convert the text format message into a message object.
return reader.read()
-def from_file(f, idna_codec: Optional[dns.name.IDNACodec]=None, one_rr_per_rrset: bool=False) -> Message:
+def from_file(f: Any, idna_codec: Optional[dns.name.IDNACodec]=None, one_rr_per_rrset: bool=False) -> Message:
"""Read the next text format message from the specified file.
Message blocks are separated by a single blank line.
return m
-def make_response(query, recursion_available=False, our_payload=8192,
- fudge=300, tsig_error=0) -> Message:
+def make_response(query: Message, recursion_available: bool=False, our_payload: int=8192,
+ fudge: int=300, tsig_error: int=0) -> Message:
"""Make a message which is a response for the specified query.
The message returned is really a response skeleton; it has all
of the infrastructure required of a response, but none of the
class IDNA2003Codec(IDNACodec):
"""IDNA 2003 encoder/decoder."""
- def __init__(self, strict_decode=False):
+ def __init__(self, strict_decode: bool=False):
"""Initialize the IDNA 2003 encoder/decoder.
*strict_decode* is a ``bool``. If `True`, then IDNA2003 checking
"""IDNA 2008 encoder/decoder.
"""
- def __init__(self, uts_46: bool=False, transitional=False,
- allow_pure_ascii=False, strict_decode=False):
+ def __init__(self, uts_46: bool=False, transitional: bool=False,
+ allow_pure_ascii: bool=False, strict_decode: bool=False):
"""Initialize the IDNA 2008 encoder/decoder.
*uts_46* is a ``bool``. If True, apply Unicode IDNA
"""DNS nodes. A node is a set of rdatasets."""
-from typing import List, Optional
+from typing import Any, Dict, List, Optional
import enum
import io
# the set of rdatasets, represented as a list.
self.rdatasets: List[dns.rdataset.Rdataset] = []
- def to_text(self, name: dns.name.Name, **kw) -> str:
+ def to_text(self, name: dns.name.Name, **kw: Dict[str, Any]) -> str:
"""Convert a node to text format.
Each rdataset at the node is printed. Any keyword arguments
s = io.StringIO()
for rds in self.rdatasets:
if len(rds) > 0:
- s.write(rds.to_text(name, **kw))
+ s.write(rds.to_text(name, **kw)) # type: ignore[arg-type]
s.write('\n')
return s.getvalue()[:-1]
covers: dns.rdatatype.RdataType=dns.rdatatype.NONE) -> None:
raise TypeError("immutable")
- def replace_rdataset(self, replacement) -> None:
+ def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
raise TypeError("immutable")
def is_immutable(self) -> bool:
def inbound_xfr(where: str, txn_manager: dns.transaction.TransactionManager,
query: Optional[dns.message.Message]=None,
port: int=53, timeout: Optional[float]=None, lifetime: Optional[float]=None,
- source: Optional[str]=None, source_port: int=0, udp_mode=UDPMode.NEVER):
+ source: Optional[str]=None, source_port: int=0, udp_mode: UDPMode=UDPMode.NEVER) -> None:
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
return self.covers() << 16 | self.rdtype
- def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw):
+ def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw: Dict[str, Any]) -> str:
"""Convert an rdata to text format.
Returns a ``str``.
def from_text(cls, rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None, relativize: bool=True,
- relativize_to: Optional[dns.name.Name]=None):
+ relativize_to: Optional[dns.name.Name]=None) -> 'Rdata':
raise NotImplementedError # pragma: no cover
@classmethod
parser: dns.wire.Parser, origin: Optional[dns.name.Name]=None) -> 'Rdata':
raise NotImplementedError # pragma: no cover
- def replace(self, **kwargs) -> 'Rdata':
+ def replace(self, **kwargs: Dict[str, Any]) -> 'Rdata':
"""
Create a new Rdata instance based on the instance replace was
invoked on. It is possible to pass different parameters to
return dns.rdatatype.RdataType.make(value)
@classmethod
- def _as_bytes(cls, value, encode=False, max_length=None, empty_ok=True) -> bytes:
+ def _as_bytes(cls, value: Any, encode: bool=False, max_length: Optional[int]=None,
+ empty_ok: bool=True) -> bytes:
if encode and isinstance(value, str):
bvalue = value.encode()
elif isinstance(value, bytearray):
random.shuffle(items)
return items
-
+@dns.immutable.immutable
class GenericRdata(Rdata):
"""Generic Rdata Class
def __init__(self, rdclass, rdtype, data):
super().__init__(rdclass, rdtype)
- object.__setattr__(self, 'data', data)
+ self.data = data
- def to_text(self, origin=None, relativize=True, **kw):
+ def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw: Dict[str, Any]) -> str:
return r'\# %d ' % len(self.data) + _hexify(self.data, **kw)
@classmethod
def register_type(implementation: Any, rdtype: int, rdtype_text: str, is_singleton: bool=False,
- rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN):
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN) -> None:
"""Dynamically register a module to handle an rdatatype.
*implementation*, a module implementing the type in the usual dnspython
"""DNS rdatasets (an rdataset is a set of rdatas of a given type and class)"""
-from typing import Any, cast, Collection, List, Optional, Union
+from typing import Any, cast, Collection, Dict, Iterable, List, Optional, Union
import io
import random
def __init__(self, rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
- covers=dns.rdatatype.NONE, ttl: int=0):
+ covers: dns.rdatatype.RdataType=dns.rdatatype.NONE, ttl: int=0):
"""Create a new rdataset of the specified class and type.
*rdclass*, a ``dns.rdataclass.RdataClass``, the rdataclass.
origin: Optional[dns.name.Name]=None,
relativize: bool=True,
override_rdclass: Optional[dns.rdataclass.RdataClass]=None,
- want_comments=False, **kw) -> str:
+ want_comments: bool=False, **kw: Dict[str, Any]) -> str:
"""Convert the rdataset into DNS zone file format.
See ``dns.name.Name.choose_relativity`` for more information
def from_text(rdclass: Union[dns.rdataclass.RdataClass, str],
rdtype: Union[dns.rdatatype.RdataType, str],
- ttl: int, *text_rdatas) -> Rdataset:
+ ttl: int, *text_rdatas: Any) -> Rdataset:
"""Create an rdataset with the specified class, type, and TTL, and with
the specified rdatas in text format.
return r
-def from_rdata(ttl: int, *rdatas) -> Rdataset:
+def from_rdata(ttl: int, *rdatas: Any) -> Rdataset:
"""Create an rdataset with the specified TTL, and with
the specified rdata objects.
return False
# pylint: disable=redefined-outer-name
-def register_type(rdtype: RdataType, rdtype_text: str, is_singleton: bool=False):
+def register_type(rdtype: RdataType, rdtype_text: str, is_singleton: bool=False) -> None:
"""Dynamically register an rdatatype.
*rdtype*, a ``dns.rdatatype.RdataType``, the rdatatype to register.
import struct
import dns.immutable
+import dns.rdata
@dns.immutable.immutable
import struct
import dns.immutable
+import dns.rdata
@dns.immutable.immutable
import dns.exception
import dns.immutable
+import dns.rdata
@dns.immutable.immutable
"""TXT-like base class."""
-from typing import Iterable, Optional, Tuple, Union
+from typing import Any, Dict, Iterable, Optional, Tuple, Union
import struct
__slots__ = ['strings']
- def __init__(self, rdclass, rdtype, strings: Iterable[Union[bytes, str]]):
+ def __init__(self, rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ strings: Iterable[Union[bytes, str]]):
"""Initialize a TXT-like rdata.
*rdclass*, an ``int`` is the rdataclass of the Rdata.
super().__init__(rdclass, rdtype)
self.strings: Tuple[bytes] = self._as_tuple(strings, lambda x: self._as_bytes(x, True, 255))
- def to_text(self, origin: Optional[dns.name.Name]=None, relativize=True, **kw):
+ def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw: Dict[str, Any]) -> str:
txt = ''
prefix = ''
for s in self.strings:
return txt
@classmethod
- def from_text(cls, rdclass, rdtype, tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None,
- relativize=True, relativize_to: Optional[dns.name.Name]=None):
+ def from_text(cls, rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None,
+ relativize: bool=True, relativize_to: Optional[dns.name.Name]=None) -> dns.rdata.Rdata:
strings = []
for token in tok.get_remaining():
token = token.unescape_to_bytes()
return self._nameservers
@nameservers.setter
- def nameservers(self, nameservers: List[str]):
+ def nameservers(self, nameservers: List[str]) -> None:
"""
*nameservers*, a ``list`` of nameservers.
raise_on_no_answer, source_port, lifetime,
True)
- def resolve_address(self, ipaddr: str, *args, **kwargs) -> Answer:
+ def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> Answer:
"""Use a resolver to run a reverse query for PTR records.
This utilizes the resolve() method to perform a PTR lookup on the
# We make a modified kwargs for type checking happiness, as otherwise
# we get a legit warning about possibly having rdtype and rdclass
# in the kwargs more than once.
- modified_kwargs = {}
+ modified_kwargs: Dict[str, Any] = {}
modified_kwargs.update(kwargs)
modified_kwargs['rdtype'] = dns.rdatatype.PTR
modified_kwargs['rdclass'] = dns.rdataclass.IN
return self.resolve(dns.reversename.from_address(ipaddr),
- *args, **modified_kwargs)
+ *args, **modified_kwargs) # type: ignore[arg-type]
# pylint: disable=redefined-outer-name
True)
-def resolve_address(ipaddr: str, *args, **kwargs) -> Answer:
+def resolve_address(ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> Answer:
"""Use a resolver to run a reverse query for PTR records.
See ``dns.resolver.Resolver.resolve_address`` for more information on the
return get_default_resolver().canonical_name(name)
-def zone_for_name(name: Union[dns.name.Name, str], rdclass=dns.rdataclass.IN,
+def zone_for_name(name: Union[dns.name.Name, str], rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN,
tcp: bool=False, resolver: Optional[Resolver]=None,
lifetime: Optional[float]=None) -> dns.name.Name:
"""Find the name of the zone which contains the specified name.
return (canonical, aliases, addresses)
-def override_system_resolver(resolver: Optional[Resolver]=None):
+def override_system_resolver(resolver: Optional[Resolver]=None) -> None:
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
socket.gethostbyaddr = _gethostbyaddr
-def restore_system_resolver():
+def restore_system_resolver() -> None:
"""Undo the effects of prior override_system_resolver()."""
global _resolver
"""DNS RRsets (an RRset is a named rdataset)"""
-from typing import Any, cast, Collection, Optional, Union
+from typing import Any, cast, Collection, Dict, Optional, Union
import dns.name
import dns.rdataset
return False
return super().__eq__(other)
- def match(self, *args, **kwargs) -> bool:
+ def match(self, *args: Any, **kwargs: Dict[str, Any]) -> bool: # type: ignore[override]
"""Does this rrset match the specified attributes?
Behaves as :py:func:`full_match()` if the first argument is a
compatibility.)
"""
if isinstance(args[0], dns.name.Name):
- return self.full_match(*args, **kwargs)
+ return self.full_match(*args, **kwargs) # type: ignore[arg-type]
else:
- return super().match(*args, **kwargs)
+ return super().match(*args, **kwargs) # type: ignore[arg-type]
def full_match(self, name: dns.name.Name, rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType,
def from_text(name: Union[dns.name.Name, str], ttl: int,
rdclass: Union[dns.rdataclass.RdataClass, str],
rdtype: Union[dns.rdatatype.RdataType, str],
- *text_rdatas) -> RRset:
+ *text_rdatas: Any) -> RRset:
"""Create an RRset with the specified name, TTL, class, and type and with
the specified rdatas in text format.
return r
-def from_rdata(name: Union[dns.name.Name, str], ttl:int, *rdatas) -> RRset:
+def from_rdata(name: Union[dns.name.Name, str], ttl:int, *rdatas: Any) -> RRset:
"""Create an RRset with the specified name and TTL, and with
the specified rdata objects.
ttype = EOF
return Token(ttype, token, has_escape)
- def unget(self, token: Token):
+ def unget(self, token: Token) -> None:
"""Unget a token.
The unget buffer for tokens is only one token large; it is
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-from typing import Callable, List, Optional, Tuple, Union
+from typing import Any, Callable, List, Optional, Tuple, Union
import collections
if self.read_only:
raise ReadOnly
- def add(self, *args) -> None:
+ def add(self, *args: Any) -> None:
"""Add records.
The arguments may be:
self._check_read_only()
self._add(False, args)
- def replace(self, *args) -> None:
+ def replace(self, *args: Any) -> None:
"""Replace the existing rdataset at the name with the specified
rdataset, or add the specified rdataset if there was no existing
rdataset.
self._check_read_only()
self._add(True, args)
- def delete(self, *args) -> None:
+ def delete(self, *args: Any) -> None:
"""Delete records.
It is not an error if some of the records are not in the existing
self._check_read_only()
self._delete(False, args)
- def delete_exact(self, *args) -> None:
+ def delete_exact(self, *args: Any) -> None:
"""Delete records.
The arguments may be:
"""
self._check_delete_rdataset.append(check)
- def check_delete_name(self, check: CheckDeleteNameType):
+ def check_delete_name(self, check: CheckDeleteNameType) -> None:
"""Call *check* before putting (storing) an rdataset.
The function is called with the transaction and the name.
_section_enum = UpdateSection # type: ignore
def __init__(self, zone: Optional[Union[dns.name.Name, str]]=None,
- rdclass=dns.rdataclass.IN,
+ rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN,
keyring: Optional[Any]=None, keyname: Optional[dns.name.Name]=None,
keyalgorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm,
id: Optional[int]=None):
self.origin)
self._add_rr(name, ttl, rd, section=section)
- def add(self, name: Union[dns.name.Name, str], *args) -> None:
+ def add(self, name: Union[dns.name.Name, str], *args: Any) -> None:
"""Add records.
The first argument is always a name. The other
self._add(False, self.update, name, *args)
- def delete(self, name: Union[dns.name.Name, str], *args) -> None:
+ def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None:
"""Delete records.
The first argument is always a name. The other
True, True)
else:
for s in largs:
- rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s,
+ rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, # type: ignore[arg-type]
self.origin)
self._add_rr(name, 0, rd, dns.rdataclass.NONE)
- def replace(self, name: Union[dns.name.Name, str], *args) -> None:
+ def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None:
"""Replace records.
The first argument is always a name. The other
self._add(True, self.update, name, *args)
- def present(self, name: Union[dns.name.Name, str], *args) -> None:
+ def present(self, name: Union[dns.name.Name, str], *args: Any) -> None:
"""Require that an owner name (and optionally an rdata type,
or specific rdataset) exists as a prerequisite to the
execution of the update.
if not isinstance(args[0], dns.rdataset.Rdataset):
# Add a 0 TTL
largs = list(args)
- largs.insert(0, 0)
+ largs.insert(0, 0) # type: ignore[arg-type]
self._add(False, self.prerequisite, name, *largs)
else:
self._add(False, self.prerequisite, name, *args)
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
-from typing import Optional, Tuple
+from typing import Iterator, Optional, Tuple
import contextlib
import struct
self.current = where
@contextlib.contextmanager
- def restrict_to(self, size: int):
+ def restrict_to(self, size: int) -> Iterator:
assert size >= 0
if size > self.remaining():
raise dns.exception.FormError
self.end = saved_end
@contextlib.contextmanager
- def restore_furthest(self):
+ def restore_furthest(self) -> Iterator:
try:
yield None
finally:
rrset = None
return rrset
- def iterate_rdatasets(self, rdtype=dns.rdatatype.ANY,
- covers=dns.rdatatype.NONE) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
+ def iterate_rdatasets(self, rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.ANY,
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE) \
+ -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
"""Return a generator which yields (name, rdataset) tuples for
all rdatasets in the zone which have the specified *rdtype*
and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
(rds.rdtype == rdtype and rds.covers == covers):
yield (name, rds)
- def iterate_rdatas(self, rdtype=dns.rdatatype.ANY,
- covers=dns.rdatatype.NONE) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
+ def iterate_rdatas(self, rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.ANY,
+ covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE) \
+ -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
"""Return a generator which yields (name, ttl, rdata) tuples for
all rdatas in the zone which have the specified *rdtype*
and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
yield (name, rds.ttl, rdata)
def to_file(self, f: Any, sorted: bool=True, relativize: bool=True, nl: Optional[str]=None,
- want_comments: bool=False, want_origin: bool=False):
+ want_comments: bool=False, want_origin: bool=False) -> None:
"""Write a zone to a file.
*f*, a file or `str`. If *f* is a string, it is treated
f.write(nl)
def to_text(self, sorted: bool=True, relativize: bool=True, nl: Optional[str]=None,
- want_comments: bool=False, want_origin: bool=False):
+ want_comments: bool=False, want_origin: bool=False) -> str:
"""Return a zone's text as though it were written to a file.
*sorted*, a ``bool``. If True, the default, then the file
raise NoSOA
return soa[0]
- def _compute_digest(self, hash_algorithm: DigestHashAlgorithm, scheme=DigestScheme.SIMPLE) -> bytes:
+ def _compute_digest(self, hash_algorithm: DigestHashAlgorithm, scheme: DigestScheme=DigestScheme.SIMPLE) -> bytes:
hashinfo = _digest_hashers.get(hash_algorithm)
if not hashinfo:
raise UnsupportedDigestHashAlgorithm
return hasher.digest()
def compute_digest(self, hash_algorithm: DigestHashAlgorithm,
- scheme=DigestScheme.SIMPLE) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
+ scheme: DigestScheme=DigestScheme.SIMPLE) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
serial = self.get_soa().serial
digest = self._compute_digest(hash_algorithm, scheme)
return dns.rdtypes.ANY.ZONEMD.ZONEMD(self.rdclass,
self.nodes = {}
self.origin = origin
- def _validate_name(self, name: dns.name.Name):
+ def _validate_name(self, name: dns.name.Name) -> dns.name.Name:
if name.is_absolute():
if self.origin is None:
# This should probably never happen as other code (e.g.
self.origin = zone.origin
self.changed: Set[dns.name.Name] = set()
- def _maybe_cow(self, name: dns.name.Name):
+ def _maybe_cow(self, name: dns.name.Name) -> dns.node.Node:
name = self._validate_name(name)
node = self.nodes.get(name)
if node is None or name not in self.changed:
assert False # make mypy happy lgtm[py/unreachable-statement]
-def from_xfr(xfr: Any, zone_factory=Zone, relativize: bool=True, check_origin: bool=True) -> Zone:
+def from_xfr(xfr: Any, zone_factory: Any=Zone, relativize: bool=True, check_origin: bool=True) -> Zone:
"""Convert the output of a zone transfer generator into a zone object.
*xfr*, a generator of ``dns.message.Message`` objects, typically