From: Bob Halley Date: Fri, 11 Mar 2022 14:54:03 +0000 (-0800) Subject: One more pass, after adding --disallow-incomplete-defs X-Git-Tag: v2.3.0rc1~99^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a4936b6b245cc289aaf363eb8dc9891c22c1adea;p=thirdparty%2Fdnspython.git One more pass, after adding --disallow-incomplete-defs --- diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index b8447749..6a6ad37c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -42,6 +42,9 @@ jobs: python -m pip install --upgrade pip python -m pip install poetry poetry install -E dnssec -E doh -E idna -E trio -E curio + - name: Typecheck + run: | + poetry run python -m mypy --disallow-incomplete-defs dns - name: Test with pytest run: | poetry run pytest --cov=dns --cov-branch --cov-report=xml:coverage.xml diff --git a/Makefile b/Makefile index fe4e8bd9..f5e54389 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ potestlf: poetry run pytest --lf potype: - poetry run python -m mypy dns/*.py + poetry run python -m mypy --disallow-incomplete-defs dns potypetests: poetry run python -m mypy --check-untyped-defs examples tests diff --git a/dns/_asyncio_backend.py b/dns/_asyncio_backend.py index 9d458da0..10917774 100644 --- a/dns/_asyncio_backend.py +++ b/dns/_asyncio_backend.py @@ -55,7 +55,7 @@ async def _maybe_wait_for(awaitable, timeout): class DatagramSocket(dns._asyncbackend.DatagramSocket): - def __init__(self, family: int, transport, protocol): + def __init__(self, family, transport, protocol): super().__init__(family) self.transport = transport self.protocol = protocol diff --git a/dns/asyncbackend.py b/dns/asyncbackend.py index 0c6fc0ae..ffd6d674 100644 --- a/dns/asyncbackend.py +++ b/dns/asyncbackend.py @@ -83,7 +83,7 @@ def get_default_backend() -> Backend: return set_default_backend(sniff()) -def set_default_backend(name: str): +def set_default_backend(name: str) -> Backend: """Set the default backend. It's not normally necessary to call this method, as diff --git a/dns/asyncquery.py b/dns/asyncquery.py index 6c441c07..977f0d41 100644 --- a/dns/asyncquery.py +++ b/dns/asyncquery.py @@ -468,7 +468,7 @@ async def https(q: dns.message.Message, where: str, timeout: Optional[float]=Non async def inbound_xfr(where: str, txn_manager: dns.transaction.TransactionManager, query: Optional[dns.message.Message]=None, port: int=53, timeout: Optional[float]=None, lifetime: Optional[float]=None, - source: Optional[str]=None, source_port: int=0, udp_mode=UDPMode.NEVER, + source: Optional[str]=None, source_port: int=0, udp_mode: UDPMode=UDPMode.NEVER, backend: Optional[dns.asyncbackend.Backend]=None) -> None: """Conduct an inbound transfer and apply it via a transaction from the txn_manager. diff --git a/dns/asyncresolver.py b/dns/asyncresolver.py index 152b1a63..e196dbbc 100644 --- a/dns/asyncresolver.py +++ b/dns/asyncresolver.py @@ -17,7 +17,7 @@ """Asynchronous DNS stub resolver.""" -from typing import Optional, Union +from typing import Any, Dict, Optional, Union import time @@ -109,7 +109,7 @@ class Resolver(dns.resolver.BaseResolver): if answer is not None: return answer - async def resolve_address(self, ipaddr: str, *args, **kwargs) -> dns.resolver.Answer: + async def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> dns.resolver.Answer: """Use an asynchronous resolver to run a reverse query for PTR records. @@ -127,7 +127,7 @@ class Resolver(dns.resolver.BaseResolver): # We make a modified kwargs for type checking happiness, as otherwise # we get a legit warning about possibly having rdtype and rdclass # in the kwargs more than once. - modified_kwargs = {} + modified_kwargs: Dict[str, Any] = {} modified_kwargs.update(kwargs) modified_kwargs['rdtype'] = dns.rdatatype.PTR modified_kwargs['rdclass'] = dns.rdataclass.IN @@ -202,7 +202,7 @@ async def resolve(qname: Union[dns.name.Name, str], backend) -async def resolve_address(ipaddr: str, *args, **kwargs) -> dns.resolver.Answer: +async def resolve_address(ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> dns.resolver.Answer: """Use a resolver to run a reverse query for PTR records. See :py:func:`dns.asyncresolver.Resolver.resolve_address` for more diff --git a/dns/dnssec.py b/dns/dnssec.py index 598734b9..331f4afc 100644 --- a/dns/dnssec.py +++ b/dns/dnssec.py @@ -143,7 +143,8 @@ def make_ds(name: Union[dns.name.Name, str], key: dns.rdata.Rdata, return cast(DS, ds) -def _find_candidate_keys(keys, rrsig: RRSIG) -> Optional[List[DNSKEY]]: +def _find_candidate_keys(keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], + rrsig: RRSIG) -> Optional[List[DNSKEY]]: value = keys.get(rrsig.signer) if isinstance(value, dns.node.Node): rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY) diff --git a/dns/edns.py b/dns/edns.py index b47b6d24..d4dca55a 100644 --- a/dns/edns.py +++ b/dns/edns.py @@ -175,7 +175,7 @@ class GenericOption(Option): # lgtm[py/missing-equals] class ECSOption(Option): # lgtm[py/missing-equals] """EDNS Client Subnet (ECS, RFC7871)""" - def __init__(self, address: str, srclen: Optional[int]=None, scopelen=0): + def __init__(self, address: str, srclen: Optional[int]=None, scopelen: int=0): """*address*, a ``str``, is the client address information. *srclen*, an ``int``, the source prefix length, which is the @@ -292,7 +292,7 @@ class ECSOption(Option): # lgtm[py/missing-equals] return value @classmethod - def from_wire_parser(cls, otype: Union[OptionType, str], parser: 'dns.wire.Parser'): + def from_wire_parser(cls, otype: Union[OptionType, str], parser: 'dns.wire.Parser') -> Option: family, src, scope = parser.get_struct('!HBB') addrlen = int(math.ceil(src / 8.0)) prefix = parser.get_bytes(addrlen) @@ -376,18 +376,18 @@ class EDEOption(Option): # lgtm[py/missing-equals] return value @classmethod - def from_wire_parser(cls, otype: Union[OptionType, str], parser) -> Option: - code = parser.get_uint16() + def from_wire_parser(cls, otype: Union[OptionType, str], parser: 'dns.wire.Parser') -> Option: + the_code = EDECode.make(parser.get_uint16()) text = parser.get_remaining() if text: if text[-1] == 0: # text MAY be null-terminated text = text[:-1] - text = text.decode('utf8') + btext = text.decode('utf8') else: - text = None + btext = None - return cls(code, text) + return cls(the_code, btext) _type_to_class: Dict[OptionType, Any] = { diff --git a/dns/message.py b/dns/message.py index a375c7e9..0e1e4336 100644 --- a/dns/message.py +++ b/dns/message.py @@ -196,7 +196,7 @@ class Message: return self.to_text() def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, - **kw) -> str: + **kw: Dict[str, Any]) -> str: """Convert the message to text. The *origin*, *relativize*, and any other keyword @@ -325,7 +325,7 @@ class Message: name: dns.name.Name, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, - covers = dns.rdatatype.NONE, + covers: dns.rdatatype.RdataType=dns.rdatatype.NONE, deleting: Optional[dns.rdataclass.RdataClass]=None, create: bool=False, force_unique: bool=False) -> dns.rrset.RRset: @@ -440,7 +440,7 @@ class Message: return rrset def to_wire(self, origin: Optional[dns.name.Name]=None, max_size: int=0, - multi: bool=False, tsig_ctx: Optional[Any]=None, **kw) -> bytes: + multi: bool=False, tsig_ctx: Optional[Any]=None, **kw: Dict[str, Any]) -> bytes: """Return a string containing the message in DNS compressed wire format. @@ -1068,7 +1068,7 @@ class _WireReader: return self.message -def from_wire(wire, keyring: Optional[Any]=None, request_mac: Optional[bytes]=b'', +def from_wire(wire: bytes, keyring: Optional[Any]=None, request_mac: Optional[bytes]=b'', xfr: bool=False, origin: Optional[dns.name.Name]=None, tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]]=None, multi: bool=False, question_only: bool=False, one_rr_per_rrset: bool=False, @@ -1388,7 +1388,7 @@ class _TextReader: return self.message -def from_text(text, idna_codec: Optional[dns.name.IDNACodec]=None, +def from_text(text: str, idna_codec: Optional[dns.name.IDNACodec]=None, one_rr_per_rrset: bool=False, origin: Optional[dns.name.Name]=None, relativize: bool=True, relativize_to: Optional[dns.name.Name]=None) -> Message: """Convert the text format message into a message object. @@ -1430,7 +1430,7 @@ def from_text(text, idna_codec: Optional[dns.name.IDNACodec]=None, return reader.read() -def from_file(f, idna_codec: Optional[dns.name.IDNACodec]=None, one_rr_per_rrset: bool=False) -> Message: +def from_file(f: Any, idna_codec: Optional[dns.name.IDNACodec]=None, one_rr_per_rrset: bool=False) -> Message: """Read the next text format message from the specified file. Message blocks are separated by a single blank line. @@ -1545,8 +1545,8 @@ def make_query(qname: Union[dns.name.Name, str], return m -def make_response(query, recursion_available=False, our_payload=8192, - fudge=300, tsig_error=0) -> Message: +def make_response(query: Message, recursion_available: bool=False, our_payload: int=8192, + fudge: int=300, tsig_error: int=0) -> Message: """Make a message which is a response for the specified query. The message returned is really a response skeleton; it has all of the infrastructure required of a response, but none of the diff --git a/dns/name.py b/dns/name.py index 5fd10a29..daf1259c 100644 --- a/dns/name.py +++ b/dns/name.py @@ -186,7 +186,7 @@ class IDNACodec: class IDNA2003Codec(IDNACodec): """IDNA 2003 encoder/decoder.""" - def __init__(self, strict_decode=False): + def __init__(self, strict_decode: bool=False): """Initialize the IDNA 2003 encoder/decoder. *strict_decode* is a ``bool``. If `True`, then IDNA2003 checking @@ -223,8 +223,8 @@ class IDNA2008Codec(IDNACodec): """IDNA 2008 encoder/decoder. """ - def __init__(self, uts_46: bool=False, transitional=False, - allow_pure_ascii=False, strict_decode=False): + def __init__(self, uts_46: bool=False, transitional: bool=False, + allow_pure_ascii: bool=False, strict_decode: bool=False): """Initialize the IDNA 2008 encoder/decoder. *uts_46* is a ``bool``. If True, apply Unicode IDNA diff --git a/dns/node.py b/dns/node.py index 8727e42d..5270b53a 100644 --- a/dns/node.py +++ b/dns/node.py @@ -17,7 +17,7 @@ """DNS nodes. A node is a set of rdatasets.""" -from typing import List, Optional +from typing import Any, Dict, List, Optional import enum import io @@ -92,7 +92,7 @@ class Node: # the set of rdatasets, represented as a list. self.rdatasets: List[dns.rdataset.Rdataset] = [] - def to_text(self, name: dns.name.Name, **kw) -> str: + def to_text(self, name: dns.name.Name, **kw: Dict[str, Any]) -> str: """Convert a node to text format. Each rdataset at the node is printed. Any keyword arguments @@ -108,7 +108,7 @@ class Node: s = io.StringIO() for rds in self.rdatasets: if len(rds) > 0: - s.write(rds.to_text(name, **kw)) + s.write(rds.to_text(name, **kw)) # type: ignore[arg-type] s.write('\n') return s.getvalue()[:-1] @@ -336,7 +336,7 @@ class ImmutableNode(Node): covers: dns.rdatatype.RdataType=dns.rdatatype.NONE) -> None: raise TypeError("immutable") - def replace_rdataset(self, replacement) -> None: + def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None: raise TypeError("immutable") def is_immutable(self) -> bool: diff --git a/dns/query.py b/dns/query.py index 1ba57790..09d51078 100644 --- a/dns/query.py +++ b/dns/query.py @@ -1097,7 +1097,7 @@ class UDPMode(enum.IntEnum): def inbound_xfr(where: str, txn_manager: dns.transaction.TransactionManager, query: Optional[dns.message.Message]=None, port: int=53, timeout: Optional[float]=None, lifetime: Optional[float]=None, - source: Optional[str]=None, source_port: int=0, udp_mode=UDPMode.NEVER): + source: Optional[str]=None, source_port: int=0, udp_mode: UDPMode=UDPMode.NEVER) -> None: """Conduct an inbound transfer and apply it via a transaction from the txn_manager. diff --git a/dns/rdata.py b/dns/rdata.py index 24e5fde5..155e1248 100644 --- a/dns/rdata.py +++ b/dns/rdata.py @@ -191,7 +191,7 @@ class Rdata: return self.covers() << 16 | self.rdtype - def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw): + def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw: Dict[str, Any]) -> str: """Convert an rdata to text format. Returns a ``str``. @@ -354,7 +354,7 @@ class Rdata: def from_text(cls, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None, relativize: bool=True, - relativize_to: Optional[dns.name.Name]=None): + relativize_to: Optional[dns.name.Name]=None) -> 'Rdata': raise NotImplementedError # pragma: no cover @classmethod @@ -363,7 +363,7 @@ class Rdata: parser: dns.wire.Parser, origin: Optional[dns.name.Name]=None) -> 'Rdata': raise NotImplementedError # pragma: no cover - def replace(self, **kwargs) -> 'Rdata': + def replace(self, **kwargs: Dict[str, Any]) -> 'Rdata': """ Create a new Rdata instance based on the instance replace was invoked on. It is possible to pass different parameters to @@ -415,7 +415,8 @@ class Rdata: return dns.rdatatype.RdataType.make(value) @classmethod - def _as_bytes(cls, value, encode=False, max_length=None, empty_ok=True) -> bytes: + def _as_bytes(cls, value: Any, encode: bool=False, max_length: Optional[int]=None, + empty_ok: bool=True) -> bytes: if encode and isinstance(value, str): bvalue = value.encode() elif isinstance(value, bytearray): @@ -540,7 +541,7 @@ class Rdata: random.shuffle(items) return items - +@dns.immutable.immutable class GenericRdata(Rdata): """Generic Rdata Class @@ -553,9 +554,9 @@ class GenericRdata(Rdata): def __init__(self, rdclass, rdtype, data): super().__init__(rdclass, rdtype) - object.__setattr__(self, 'data', data) + self.data = data - def to_text(self, origin=None, relativize=True, **kw): + def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw: Dict[str, Any]) -> str: return r'\# %d ' % len(self.data) + _hexify(self.data, **kw) @classmethod @@ -770,7 +771,7 @@ class RdatatypeExists(dns.exception.DNSException): def register_type(implementation: Any, rdtype: int, rdtype_text: str, is_singleton: bool=False, - rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN): + rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN) -> None: """Dynamically register a module to handle an rdatatype. *implementation*, a module implementing the type in the usual dnspython diff --git a/dns/rdataset.py b/dns/rdataset.py index b47057fd..c4b86445 100644 --- a/dns/rdataset.py +++ b/dns/rdataset.py @@ -17,7 +17,7 @@ """DNS rdatasets (an rdataset is a set of rdatas of a given type and class)""" -from typing import Any, cast, Collection, List, Optional, Union +from typing import Any, cast, Collection, Dict, Iterable, List, Optional, Union import io import random @@ -53,7 +53,7 @@ class Rdataset(dns.set.Set): def __init__(self, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, - covers=dns.rdatatype.NONE, ttl: int=0): + covers: dns.rdatatype.RdataType=dns.rdatatype.NONE, ttl: int=0): """Create a new rdataset of the specified class and type. *rdclass*, a ``dns.rdataclass.RdataClass``, the rdataclass. @@ -186,7 +186,7 @@ class Rdataset(dns.set.Set): origin: Optional[dns.name.Name]=None, relativize: bool=True, override_rdclass: Optional[dns.rdataclass.RdataClass]=None, - want_comments=False, **kw) -> str: + want_comments: bool=False, **kw: Dict[str, Any]) -> str: """Convert the rdataset into DNS zone file format. See ``dns.name.Name.choose_relativity`` for more information @@ -446,7 +446,7 @@ def from_text_list(rdclass: Union[dns.rdataclass.RdataClass, str], def from_text(rdclass: Union[dns.rdataclass.RdataClass, str], rdtype: Union[dns.rdatatype.RdataType, str], - ttl: int, *text_rdatas) -> Rdataset: + ttl: int, *text_rdatas: Any) -> Rdataset: """Create an rdataset with the specified class, type, and TTL, and with the specified rdatas in text format. @@ -475,7 +475,7 @@ def from_rdata_list(ttl: int, rdatas: Collection[dns.rdata.Rdata]) -> Rdataset: return r -def from_rdata(ttl: int, *rdatas) -> Rdataset: +def from_rdata(ttl: int, *rdatas: Any) -> Rdataset: """Create an rdataset with the specified TTL, and with the specified rdata objects. diff --git a/dns/rdatatype.py b/dns/rdatatype.py index 18185bca..aded5bdb 100644 --- a/dns/rdatatype.py +++ b/dns/rdatatype.py @@ -212,7 +212,7 @@ def is_singleton(rdtype: RdataType) -> bool: return False # pylint: disable=redefined-outer-name -def register_type(rdtype: RdataType, rdtype_text: str, is_singleton: bool=False): +def register_type(rdtype: RdataType, rdtype_text: str, is_singleton: bool=False) -> None: """Dynamically register an rdatatype. *rdtype*, a ``dns.rdatatype.RdataType``, the rdatatype to register. diff --git a/dns/rdtypes/ANY/L32.py b/dns/rdtypes/ANY/L32.py index 47eff958..038fc3a3 100644 --- a/dns/rdtypes/ANY/L32.py +++ b/dns/rdtypes/ANY/L32.py @@ -3,6 +3,7 @@ import struct import dns.immutable +import dns.rdata @dns.immutable.immutable diff --git a/dns/rdtypes/ANY/LP.py b/dns/rdtypes/ANY/LP.py index b6a2e36c..a4adffb3 100644 --- a/dns/rdtypes/ANY/LP.py +++ b/dns/rdtypes/ANY/LP.py @@ -3,6 +3,7 @@ import struct import dns.immutable +import dns.rdata @dns.immutable.immutable diff --git a/dns/rdtypes/IN/DHCID.py b/dns/rdtypes/IN/DHCID.py index a9185989..c1c70b46 100644 --- a/dns/rdtypes/IN/DHCID.py +++ b/dns/rdtypes/IN/DHCID.py @@ -19,6 +19,7 @@ import base64 import dns.exception import dns.immutable +import dns.rdata @dns.immutable.immutable diff --git a/dns/rdtypes/txtbase.py b/dns/rdtypes/txtbase.py index 7ad7914f..afef98e4 100644 --- a/dns/rdtypes/txtbase.py +++ b/dns/rdtypes/txtbase.py @@ -17,7 +17,7 @@ """TXT-like base class.""" -from typing import Iterable, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Optional, Tuple, Union import struct @@ -34,7 +34,9 @@ class TXTBase(dns.rdata.Rdata): __slots__ = ['strings'] - def __init__(self, rdclass, rdtype, strings: Iterable[Union[bytes, str]]): + def __init__(self, rdclass: dns.rdataclass.RdataClass, + rdtype: dns.rdatatype.RdataType, + strings: Iterable[Union[bytes, str]]): """Initialize a TXT-like rdata. *rdclass*, an ``int`` is the rdataclass of the Rdata. @@ -46,7 +48,7 @@ class TXTBase(dns.rdata.Rdata): super().__init__(rdclass, rdtype) self.strings: Tuple[bytes] = self._as_tuple(strings, lambda x: self._as_bytes(x, True, 255)) - def to_text(self, origin: Optional[dns.name.Name]=None, relativize=True, **kw): + def to_text(self, origin: Optional[dns.name.Name]=None, relativize: bool=True, **kw: Dict[str, Any]) -> str: txt = '' prefix = '' for s in self.strings: @@ -55,8 +57,10 @@ class TXTBase(dns.rdata.Rdata): return txt @classmethod - def from_text(cls, rdclass, rdtype, tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None, - relativize=True, relativize_to: Optional[dns.name.Name]=None): + def from_text(cls, rdclass: dns.rdataclass.RdataClass, + rdtype: dns.rdatatype.RdataType, + tok: dns.tokenizer.Tokenizer, origin: Optional[dns.name.Name]=None, + relativize: bool=True, relativize_to: Optional[dns.name.Name]=None) -> dns.rdata.Rdata: strings = [] for token in tok.get_remaining(): token = token.unescape_to_bytes() diff --git a/dns/resolver.py b/dns/resolver.py index 28769f4e..e7411b98 100644 --- a/dns/resolver.py +++ b/dns/resolver.py @@ -1007,7 +1007,7 @@ class BaseResolver: return self._nameservers @nameservers.setter - def nameservers(self, nameservers: List[str]): + def nameservers(self, nameservers: List[str]) -> None: """ *nameservers*, a ``list`` of nameservers. @@ -1156,7 +1156,7 @@ class Resolver(BaseResolver): raise_on_no_answer, source_port, lifetime, True) - def resolve_address(self, ipaddr: str, *args, **kwargs) -> Answer: + def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> Answer: """Use a resolver to run a reverse query for PTR records. This utilizes the resolve() method to perform a PTR lookup on the @@ -1172,12 +1172,12 @@ class Resolver(BaseResolver): # We make a modified kwargs for type checking happiness, as otherwise # we get a legit warning about possibly having rdtype and rdclass # in the kwargs more than once. - modified_kwargs = {} + modified_kwargs: Dict[str, Any] = {} modified_kwargs.update(kwargs) modified_kwargs['rdtype'] = dns.rdatatype.PTR modified_kwargs['rdclass'] = dns.rdataclass.IN return self.resolve(dns.reversename.from_address(ipaddr), - *args, **modified_kwargs) + *args, **modified_kwargs) # type: ignore[arg-type] # pylint: disable=redefined-outer-name @@ -1266,7 +1266,7 @@ def query(qname: Union[dns.name.Name, str], True) -def resolve_address(ipaddr: str, *args, **kwargs) -> Answer: +def resolve_address(ipaddr: str, *args: Any, **kwargs: Dict[str, Any]) -> Answer: """Use a resolver to run a reverse query for PTR records. See ``dns.resolver.Resolver.resolve_address`` for more information on the @@ -1286,7 +1286,7 @@ def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: return get_default_resolver().canonical_name(name) -def zone_for_name(name: Union[dns.name.Name, str], rdclass=dns.rdataclass.IN, +def zone_for_name(name: Union[dns.name.Name, str], rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN, tcp: bool=False, resolver: Optional[Resolver]=None, lifetime: Optional[float]=None) -> dns.name.Name: """Find the name of the zone which contains the specified name. @@ -1595,7 +1595,7 @@ def _gethostbyaddr(ip): return (canonical, aliases, addresses) -def override_system_resolver(resolver: Optional[Resolver]=None): +def override_system_resolver(resolver: Optional[Resolver]=None) -> None: """Override the system resolver routines in the socket module with versions which use dnspython's resolver. @@ -1621,7 +1621,7 @@ def override_system_resolver(resolver: Optional[Resolver]=None): socket.gethostbyaddr = _gethostbyaddr -def restore_system_resolver(): +def restore_system_resolver() -> None: """Undo the effects of prior override_system_resolver().""" global _resolver diff --git a/dns/rrset.py b/dns/rrset.py index e14433ee..bfa47630 100644 --- a/dns/rrset.py +++ b/dns/rrset.py @@ -17,7 +17,7 @@ """DNS RRsets (an RRset is a named rdataset)""" -from typing import Any, cast, Collection, Optional, Union +from typing import Any, cast, Collection, Dict, Optional, Union import dns.name import dns.rdataset @@ -79,7 +79,7 @@ class RRset(dns.rdataset.Rdataset): return False return super().__eq__(other) - def match(self, *args, **kwargs) -> bool: + def match(self, *args: Any, **kwargs: Dict[str, Any]) -> bool: # type: ignore[override] """Does this rrset match the specified attributes? Behaves as :py:func:`full_match()` if the first argument is a @@ -92,9 +92,9 @@ class RRset(dns.rdataset.Rdataset): compatibility.) """ if isinstance(args[0], dns.name.Name): - return self.full_match(*args, **kwargs) + return self.full_match(*args, **kwargs) # type: ignore[arg-type] else: - return super().match(*args, **kwargs) + return super().match(*args, **kwargs) # type: ignore[arg-type] def full_match(self, name: dns.name.Name, rdclass: dns.rdataclass.RdataClass, rdtype: dns.rdatatype.RdataType, covers: dns.rdatatype.RdataType, @@ -194,7 +194,7 @@ def from_text_list(name: Union[dns.name.Name, str], ttl: int, def from_text(name: Union[dns.name.Name, str], ttl: int, rdclass: Union[dns.rdataclass.RdataClass, str], rdtype: Union[dns.rdatatype.RdataType, str], - *text_rdatas) -> RRset: + *text_rdatas: Any) -> RRset: """Create an RRset with the specified name, TTL, class, and type and with the specified rdatas in text format. @@ -234,7 +234,7 @@ def from_rdata_list(name: Union[dns.name.Name, str], ttl: int, return r -def from_rdata(name: Union[dns.name.Name, str], ttl:int, *rdatas) -> RRset: +def from_rdata(name: Union[dns.name.Name, str], ttl:int, *rdatas: Any) -> RRset: """Create an RRset with the specified name and TTL, and with the specified rdata objects. diff --git a/dns/tokenizer.py b/dns/tokenizer.py index 331bee3c..275861c6 100644 --- a/dns/tokenizer.py +++ b/dns/tokenizer.py @@ -444,7 +444,7 @@ class Tokenizer: ttype = EOF return Token(ttype, token, has_escape) - def unget(self, token: Token): + def unget(self, token: Token) -> None: """Unget a token. The unget buffer for tokens is only one token large; it is diff --git a/dns/transaction.py b/dns/transaction.py index f48d83ef..1e97c757 100644 --- a/dns/transaction.py +++ b/dns/transaction.py @@ -1,6 +1,6 @@ # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license -from typing import Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Tuple, Union import collections @@ -144,7 +144,7 @@ class Transaction: if self.read_only: raise ReadOnly - def add(self, *args) -> None: + def add(self, *args: Any) -> None: """Add records. The arguments may be: @@ -159,7 +159,7 @@ class Transaction: self._check_read_only() self._add(False, args) - def replace(self, *args) -> None: + def replace(self, *args: Any) -> None: """Replace the existing rdataset at the name with the specified rdataset, or add the specified rdataset if there was no existing rdataset. @@ -180,7 +180,7 @@ class Transaction: self._check_read_only() self._add(True, args) - def delete(self, *args) -> None: + def delete(self, *args: Any) -> None: """Delete records. It is not an error if some of the records are not in the existing @@ -202,7 +202,7 @@ class Transaction: self._check_read_only() self._delete(False, args) - def delete_exact(self, *args) -> None: + def delete_exact(self, *args: Any) -> None: """Delete records. The arguments may be: @@ -329,7 +329,7 @@ class Transaction: """ self._check_delete_rdataset.append(check) - def check_delete_name(self, check: CheckDeleteNameType): + def check_delete_name(self, check: CheckDeleteNameType) -> None: """Call *check* before putting (storing) an rdataset. The function is called with the transaction and the name. diff --git a/dns/update.py b/dns/update.py index 9e9b113b..eb7b9364 100644 --- a/dns/update.py +++ b/dns/update.py @@ -47,7 +47,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] _section_enum = UpdateSection # type: ignore def __init__(self, zone: Optional[Union[dns.name.Name, str]]=None, - rdclass=dns.rdataclass.IN, + rdclass: dns.rdataclass.RdataClass=dns.rdataclass.IN, keyring: Optional[Any]=None, keyname: Optional[dns.name.Name]=None, keyalgorithm: Union[dns.name.Name, str]=dns.tsig.default_algorithm, id: Optional[int]=None): @@ -157,7 +157,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] self.origin) self._add_rr(name, ttl, rd, section=section) - def add(self, name: Union[dns.name.Name, str], *args) -> None: + def add(self, name: Union[dns.name.Name, str], *args: Any) -> None: """Add records. The first argument is always a name. The other @@ -172,7 +172,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] self._add(False, self.update, name, *args) - def delete(self, name: Union[dns.name.Name, str], *args) -> None: + def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None: """Delete records. The first argument is always a name. The other @@ -212,11 +212,11 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] True, True) else: for s in largs: - rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, + rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, # type: ignore[arg-type] self.origin) self._add_rr(name, 0, rd, dns.rdataclass.NONE) - def replace(self, name: Union[dns.name.Name, str], *args) -> None: + def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None: """Replace records. The first argument is always a name. The other @@ -234,7 +234,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] self._add(True, self.update, name, *args) - def present(self, name: Union[dns.name.Name, str], *args) -> None: + def present(self, name: Union[dns.name.Name, str], *args: Any) -> None: """Require that an owner name (and optionally an rdata type, or specific rdataset) exists as a prerequisite to the execution of the update. @@ -262,7 +262,7 @@ class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals] if not isinstance(args[0], dns.rdataset.Rdataset): # Add a 0 TTL largs = list(args) - largs.insert(0, 0) + largs.insert(0, 0) # type: ignore[arg-type] self._add(False, self.prerequisite, name, *largs) else: self._add(False, self.prerequisite, name, *args) diff --git a/dns/wire.py b/dns/wire.py index 87814eea..905930f7 100644 --- a/dns/wire.py +++ b/dns/wire.py @@ -1,6 +1,6 @@ # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license -from typing import Optional, Tuple +from typing import Iterator, Optional, Tuple import contextlib import struct @@ -65,7 +65,7 @@ class Parser: self.current = where @contextlib.contextmanager - def restrict_to(self, size: int): + def restrict_to(self, size: int) -> Iterator: assert size >= 0 if size > self.remaining(): raise dns.exception.FormError @@ -82,7 +82,7 @@ class Parser: self.end = saved_end @contextlib.contextmanager - def restore_furthest(self): + def restore_furthest(self) -> Iterator: try: yield None finally: diff --git a/dns/zone.py b/dns/zone.py index 91fb6970..d57838c6 100644 --- a/dns/zone.py +++ b/dns/zone.py @@ -520,8 +520,9 @@ class Zone(dns.transaction.TransactionManager): rrset = None return rrset - def iterate_rdatasets(self, rdtype=dns.rdatatype.ANY, - covers=dns.rdatatype.NONE) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]: + def iterate_rdatasets(self, rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.ANY, + covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE) \ + -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]: """Return a generator which yields (name, rdataset) tuples for all rdatasets in the zone which have the specified *rdtype* and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default, @@ -548,8 +549,9 @@ class Zone(dns.transaction.TransactionManager): (rds.rdtype == rdtype and rds.covers == covers): yield (name, rds) - def iterate_rdatas(self, rdtype=dns.rdatatype.ANY, - covers=dns.rdatatype.NONE) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]: + def iterate_rdatas(self, rdtype: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.ANY, + covers: Union[dns.rdatatype.RdataType, str]=dns.rdatatype.NONE) \ + -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]: """Return a generator which yields (name, ttl, rdata) tuples for all rdatas in the zone which have the specified *rdtype* and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default, @@ -578,7 +580,7 @@ class Zone(dns.transaction.TransactionManager): yield (name, rds.ttl, rdata) def to_file(self, f: Any, sorted: bool=True, relativize: bool=True, nl: Optional[str]=None, - want_comments: bool=False, want_origin: bool=False): + want_comments: bool=False, want_origin: bool=False) -> None: """Write a zone to a file. *f*, a file or `str`. If *f* is a string, it is treated @@ -656,7 +658,7 @@ class Zone(dns.transaction.TransactionManager): f.write(nl) def to_text(self, sorted: bool=True, relativize: bool=True, nl: Optional[str]=None, - want_comments: bool=False, want_origin: bool=False): + want_comments: bool=False, want_origin: bool=False) -> str: """Return a zone's text as though it were written to a file. *sorted*, a ``bool``. If True, the default, then the file @@ -732,7 +734,7 @@ class Zone(dns.transaction.TransactionManager): raise NoSOA return soa[0] - def _compute_digest(self, hash_algorithm: DigestHashAlgorithm, scheme=DigestScheme.SIMPLE) -> bytes: + def _compute_digest(self, hash_algorithm: DigestHashAlgorithm, scheme: DigestScheme=DigestScheme.SIMPLE) -> bytes: hashinfo = _digest_hashers.get(hash_algorithm) if not hashinfo: raise UnsupportedDigestHashAlgorithm @@ -762,7 +764,7 @@ class Zone(dns.transaction.TransactionManager): return hasher.digest() def compute_digest(self, hash_algorithm: DigestHashAlgorithm, - scheme=DigestScheme.SIMPLE) -> dns.rdtypes.ANY.ZONEMD.ZONEMD: + scheme: DigestScheme=DigestScheme.SIMPLE) -> dns.rdtypes.ANY.ZONEMD.ZONEMD: serial = self.get_soa().serial digest = self._compute_digest(hash_algorithm, scheme) return dns.rdtypes.ANY.ZONEMD.ZONEMD(self.rdclass, @@ -894,7 +896,7 @@ class Version: self.nodes = {} self.origin = origin - def _validate_name(self, name: dns.name.Name): + def _validate_name(self, name: dns.name.Name) -> dns.name.Name: if name.is_absolute(): if self.origin is None: # This should probably never happen as other code (e.g. @@ -944,7 +946,7 @@ class WritableVersion(Version): self.origin = zone.origin self.changed: Set[dns.name.Name] = set() - def _maybe_cow(self, name: dns.name.Name): + def _maybe_cow(self, name: dns.name.Name) -> dns.node.Node: name = self._validate_name(name) node = self.nodes.get(name) if node is None or name not in self.changed: @@ -1208,7 +1210,7 @@ def from_file(f: Any, origin: Optional[Union[dns.name.Name, str]]=None, assert False # make mypy happy lgtm[py/unreachable-statement] -def from_xfr(xfr: Any, zone_factory=Zone, relativize: bool=True, check_origin: bool=True) -> Zone: +def from_xfr(xfr: Any, zone_factory: Any=Zone, relativize: bool=True, check_origin: bool=True) -> Zone: """Convert the output of a zone transfer generator into a zone object. *xfr*, a generator of ``dns.message.Message`` objects, typically