potype:
poetry run python -m mypy examples tests dns/*.py
+polint:
+ poetry run pylint dns/*.py
+
poflake:
poetry run flake8 dns
self.writer = writer
async def sendall(self, what, timeout):
- self.writer.write(what),
+ self.writer.write(what)
return await _maybe_wait_for(self.writer.drain(), timeout)
- raise dns.exception.Timeout(timeout=timeout)
async def recv(self, count, timeout):
return await _maybe_wait_for(self.reader.read(count),
timeout)
- raise dns.exception.Timeout(timeout=timeout)
async def close(self):
self.writer.close()
# for brevity
_lltuple = dns.inet.low_level_address_tuple
+# pylint: disable=redefined-outer-name
+
class DatagramSocket(dns._asyncbackend.DatagramSocket):
def __init__(self, socket):
# for brevity
_lltuple = dns.inet.low_level_address_tuple
+# pylint: disable=redefined-outer-name
+
class DatagramSocket(dns._asyncbackend.DatagramSocket):
def __init__(self, socket):
import dns.exception
+# pylint: disable=unused-import
+
from dns._asyncbackend import Socket, DatagramSocket, \
StreamSocket, Backend # noqa:
+# pylint: enable=unused-import
_default_backend = None
Raises NotImplementError if an unknown backend name is specified.
"""
+ # pylint: disable=import-outside-toplevel,redefined-outer-name
backend = _backends.get(name)
if backend:
return backend
Returns the name of the library, or raises AsyncLibraryNotFoundError
if the library cannot be determined.
"""
+ # pylint: disable=import-outside-toplevel
try:
if _no_sniffio:
raise ImportError
class Resolver(dns.resolver.Resolver):
+ # pylint: disable=invalid-overridden-method, arguments-differ
+
async def resolve(self, qname, rdtype=dns.rdatatype.A,
rdclass=dns.rdataclass.IN,
tcp=False, source=None, raise_on_no_answer=True,
if answer is not None:
return answer
+ # pylint: disable=signature-differs
+
async def query(self, *args, **kwargs):
# We have to define something here as we don't want to inherit the
# parent's query().
raise NotImplementedError
+ # pylint: enable=signature-differs
+
async def resolve_address(self, ipaddr, *args, **kwargs):
"""Use an asynchronous resolver to run a reverse query for PTR
records.
rdclass=dns.rdataclass.IN,
*args, **kwargs)
+ # pylint: disable=redefined-outer-name
+
async def canonical_name(self, name):
"""Determine the canonical name of *name*.
domain_encoded = domain.canonicalize().to_wire()
digest = hashlib.sha1(domain_encoded + salt_encoded).digest()
- for i in range(iterations):
+ for _ in range(iterations):
digest = hashlib.sha1(digest + salt_encoded).digest()
output = base64.b32encode(digest).decode("utf-8")
if seen_empty:
raise dns.exception.SyntaxError
seen_empty = True
- for i in range(0, 8 - l + 1):
+ for _ in range(0, 8 - l + 1):
canonical.append(b'0000')
else:
lc = len(c)
# What the caller picked is fine.
return value
+ # pylint: disable=unused-argument
+
def _parse_rr_header(self, section, name, rdclass, rdtype):
return (rdclass, rdtype, None, False)
+ # pylint: enable=unused-argument
+
def _parse_special_rr_header(self, section, count, position,
name, rdclass, rdtype):
if rdtype == dns.rdatatype.OPT:
# We avoid circular imports by doing this here. We do it in another
# function as doing it in _message_factory_from_opcode() makes "dns"
# a local symbol, and the first line fails :)
+
+ # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import
import dns.update # noqa: F401
"""
section = self.message.sections[section_number]
- for i in range(qcount):
+ for _ in range(qcount):
qname = self.parser.get_name(self.message.origin)
(rdtype, rdclass) = self.parser.get_struct('!HH')
(rdclass, rdtype, _, _) = \
self.opcode = dns.opcode.QUERY
self.flags = 0
- def _header_line(self, section):
+ def _header_line(self, _):
"""Process one line from the text format header section."""
token = self.tok.get()
Returns a ``bool``.
"""
- (nr, o, nl) = self.fullcompare(other)
+ (nr, _, _) = self.fullcompare(other)
if nr == NAMERELN_SUBDOMAIN or nr == NAMERELN_EQUAL:
return True
return False
Returns a ``bool``.
"""
- (nr, o, nl) = self.fullcompare(other)
+ (nr, _, _) = self.fullcompare(other)
if nr == NAMERELN_SUPERDOMAIN or nr == NAMERELN_EQUAL:
return True
return False
return (now, now + timeout)
-def _wait_for(fd, readable, writable, error, expiration):
+def _wait_for(fd, readable, writable, _, expiration):
# Use the selected selector class to wait for any of the specified
# events. An "expiration" absolute time is converted into a relative
# timeout.
+ #
+ # The unused parameter is 'error', which is always set when
+ # selecting for read or write, and we have no error-only selects.
if readable and isinstance(fd, ssl.SSLSocket) and fd.pending() > 0:
return True
raise NoDOH # pragma: no cover
wire = q.to_wire()
- (af, destination, source) = _destination_and_source(where, port,
- source, source_port,
- False)
+ (af, _, source) = _destination_and_source(where, port, source, source_port,
+ False)
transport_adapter = None
headers = {
"accept": "application/dns-message"
}
- try:
- where_af = dns.inet.af_for_address(where)
- if where_af == socket.AF_INET:
+ if af is not None:
+ if af == socket.AF_INET:
url = 'https://{}:{}{}'.format(where, port, path)
- elif where_af == socket.AF_INET6:
+ elif af == socket.AF_INET6:
url = 'https://[{}]:{}{}'.format(where, port, path)
- except ValueError:
- if bootstrap_address is not None:
- split_url = urllib.parse.urlsplit(where)
- headers['Host'] = split_url.hostname
- url = where.replace(split_url.hostname, bootstrap_address)
- transport_adapter = HostHeaderSSLAdapter()
- else:
- url = where
+ elif bootstrap_address is not None:
+ split_url = urllib.parse.urlsplit(where)
+ headers['Host'] = split_url.hostname
+ url = where.replace(split_url.hostname, bootstrap_address)
+ transport_adapter = HostHeaderSSLAdapter()
+ else:
+ url = where
if source is not None:
# set source port and source address
transport_adapter = SourceAddressAdapter(source)
(expiration is not None and mexpiration > expiration):
mexpiration = expiration
if use_udp:
- (wire, from_address) = _udp_recv(s, 65535, expiration)
+ (wire, _) = _udp_recv(s, 65535, expiration)
else:
ldata = _net_read(s, 2, mexpiration)
(l,) = struct.unpack("!H", ldata)
elif ttl < self.ttl:
self.ttl = ttl
- def add(self, rd, ttl=None):
+ def add(self, rd, ttl=None): # pylint: disable=arguments-differ
"""Add the specified rdata to the rdataset.
If the optional *ttl* parameter is supplied, then
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-import dns.rdtypes.mxbase
import struct
+import dns.rdtypes.mxbase
+
class A(dns.rdata.Rdata):
"""A record for Chaosnet"""
import dns.tsig
if sys.platform == 'win32':
+ # pylint: disable=import-error
import winreg
class NXDOMAIN(dns.exception.DNSException):
supp_kwargs = {'qnames', 'responses'}
fmt = None # we have our own __str__ implementation
- def _check_kwargs(self, qnames, responses=None):
+ # pylint: disable=arguments-differ
+
+ def _check_kwargs(self, qnames,
+ responses=None):
if not isinstance(qnames, (list, tuple, set)):
raise AttributeError("qnames must be a list, tuple or set")
if len(qnames) == 0:
def _fmt_kwargs(self, **kwargs):
srv_msgs = []
for err in kwargs['errors']:
+ # pylint: disable=bad-continuation
srv_msgs.append('Server {} {} port {} answered {}'.format(err[0],
'TCP' if err[1] else 'UDP', err[2], err[3]))
return super()._fmt_kwargs(query=kwargs['request'].question,
self.search.append(dns.name.from_text(s))
def _config_win32_fromkey(self, key, always_try_domain):
+ # pylint: disable=undefined-variable
+ # (disabled for WindowsError)
try:
- servers, rtype = winreg.QueryValueEx(key, 'NameServer')
- except WindowsError: # pylint: disable=undefined-variable
+ servers, _ = winreg.QueryValueEx(key, 'NameServer')
+ except WindowsError: # pragma: no cover
servers = None
if servers:
self._config_win32_nameservers(servers)
if servers or always_try_domain:
try:
- dom, rtype = winreg.QueryValueEx(key, 'Domain')
+ dom, _ = winreg.QueryValueEx(key, 'Domain')
if dom:
self._config_win32_domain(dom) # pragma: no cover
except WindowsError: # pragma: no cover
pass
else:
try:
- servers, rtype = winreg.QueryValueEx(key, 'DhcpNameServer')
+ servers, _ = winreg.QueryValueEx(key, 'DhcpNameServer')
except WindowsError: # pragma: no cover
servers = None
if servers:
self._config_win32_nameservers(servers)
try:
- dom, rtype = winreg.QueryValueEx(key, 'DhcpDomain')
+ dom, _ = winreg.QueryValueEx(key, 'DhcpDomain')
if dom:
self._config_win32_domain(dom)
except WindowsError: # pragma: no cover
pass
try:
- search, rtype = winreg.QueryValueEx(key, 'SearchList')
+ search, _ = winreg.QueryValueEx(key, 'SearchList')
except WindowsError: # pragma: no cover
search = None
if search: # pragma: no cover
try:
guid = winreg.EnumKey(interfaces, i)
i += 1
+ # XXXRTH why do we get this key and then not use it?
key = winreg.OpenKey(interfaces, guid)
if not self._win32_is_nic_enabled(lm, guid, key):
continue
finally:
lm.Close()
- def _win32_is_nic_enabled(self, lm, guid,
- interface_key):
+ def _win32_is_nic_enabled(self, lm, guid, _):
# Look in the Windows Registry to determine whether the network
# interface corresponding to the given guid is enabled.
#
def resolve(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
tcp=False, source=None, raise_on_no_answer=True, source_port=0,
- lifetime=None, search=None):
+ lifetime=None, search=None): # pylint: disable=arguments-differ
"""Query nameservers to find the answer to the question.
The *qname*, *rdtype*, and *rdclass* parameters may be objects
rdclass=dns.rdataclass.IN,
*args, **kwargs)
+ # pylint: disable=redefined-outer-name
+
def canonical_name(self, name):
"""Determine the canonical name of *name*.
canonical_name = e.canonical_name
return canonical_name
+ # pylint: enable=redefined-outer-name
+
def use_tsig(self, keyring, keyname=None,
algorithm=dns.tsig.default_algorithm):
"""Add a TSIG signature to each query.
raise socket.gaierror(socket.EAI_NONAME, 'Name or service not known')
v6addrs = []
v4addrs = []
- canonical_name = None
+ canonical_name = None # pylint: disable=redefined-outer-name
# Is host None or an address literal? If so, use the system's
# getaddrinfo().
if host is None:
'Name or service not known')
sockaddr = (ip, 80)
family = socket.AF_INET
- (name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
+ (name, _) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
aliases = []
addresses = []
tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP,
return self.to_text()
def __eq__(self, other):
- if not isinstance(other, RRset):
- return False
- if self.name != other.name:
+ if isinstance(other, RRset):
+ if self.name != other.name:
+ return False
+ elif not isinstance(other, dns.rdataset.Rdataset):
return False
return super().__eq__(other)
- def match(self, name, rdclass, rdtype, covers, deleting=None):
+ # pylint: disable=arguments-differ
+
+ def match(self, name, rdclass, rdtype, covers,
+ deleting=None):
"""Returns ``True`` if this rrset matches the specified class, type,
covers, and deletion state.
"""
return False
return True
- def to_text(self, origin=None, relativize=True, **kw):
+ def to_text(self, origin=None, relativize=True,
+ **kw):
"""Convert the RRset into DNS master file format.
See ``dns.name.Name.choose_relativity`` for more information
return super().to_text(self.name, origin, relativize,
self.deleting, **kw)
- def to_wire(self, file, compress=None, origin=None, **kw):
+ def to_wire(self, file, compress=None, origin=None,
+ **kw):
"""Convert the RRset to wire format.
All keyword arguments are passed to ``dns.rdataset.to_wire()``; see
return super().to_wire(self.name, file, compress, origin,
self.deleting, **kw)
+ # pylint: enable=arguments-differ
+
def to_rdataset(self):
"""Convert an RRset into an Rdataset.
# rhs (required)
rhs = token.value
- lmod, lsign, loffset, lwidth, lbase = self._parse_modify(lhs)
- rmod, rsign, roffset, rwidth, rbase = self._parse_modify(rhs)
+ # The code currently only supports base 'd', so the last value
+ # in the tuple _parse_modify returns is ignored
+ lmod, lsign, loffset, lwidth, _ = self._parse_modify(lhs)
+ rmod, rsign, roffset, rwidth, _ = self._parse_modify(rhs)
for i in range(start, stop + 1, step):
# +1 because bind is inclusive and python is exclusive
enable=
all,
- python3
# It's worth looking at len-as-condition for optimization, but it's disabled
# here as it is not a correctness thing. Similarly eq-without-hash is
disable=
R,
I,
- anomalous-backslash-in-string,
- arguments-differ,
- assigning-non-slot,
- bad-builtin,
- bad-continuation,
broad-except,
- deprecated-method,
fixme,
global-statement,
invalid-name,
- missing-docstring,
+ missing-module-docstring,
+ missing-class-docstring,
+ missing-function-docstring,
no-absolute-import,
- no-member,
+ no-member, # We'd like to use this, but our immutability is too hard for it
protected-access,
redefined-builtin,
too-many-lines,
- unused-argument,
- unused-variable,
- wrong-import-order,
- wrong-import-position,
- len-as-condition,
- eq-without-hash,
- next-method-defined,
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html. You can also give a reporter class, eg
# mypackage.mymodule.MyReporterClass.
-output-format=colorized
+#output-format=colorized
+output-format=parseable
# Tells whether to display a full report or only the messages
reports=no