[flake8]
-ignore = E741
+ignore =
+ # Prefer emacs indentation of continued lines
+ E126,
+ E129,
+ # Whitespace round parameter '=' can be excessive
+ E252,
+ # Not excited by the "two blank lines" rule
+ E302,
+ E305,
+ # Ambigious variables are ok.
+ E741,
+ # Lines ending with binary operators are OK
+ W504,
+
+max-line-length = 80
'zone',
]
-from dns.version import version as __version__
+from dns.version import version as __version__ # noqa
sig = rrsig.signature
elif _is_gost(rrsig.algorithm):
raise UnsupportedAlgorithm(
- 'algorithm "%s" not supported by dnspython' % algorithm_to_text(rrsig.algorithm))
+ 'algorithm "%s" not supported by dnspython' %
+ algorithm_to_text(rrsig.algorithm))
else:
raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
# Raise here for code clarity; this won't actually ever happen
# since if the algorithm is really unknown we'd already have
# raised an exception above
- raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
- # If we got here, we successfully verified so we can return without error
+ raise ValidationFailure('unknown algorithm %u' %
+ rrsig.algorithm)
+ # If we got here, we successfully verified so we can return
+ # without error
return
except InvalidSignature:
# this happens on an individual validation failure
self.scopelen = scopelen
addrdata = dns.inet.inet_pton(af, address)
- nbytes = int(math.ceil(srclen/8.0))
+ nbytes = int(math.ceil(srclen / 8.0))
# Truncate to srclen and pad to the end of the last octet needed
# See RFC section 6
def to_text(self):
return "ECS {}/{} scope/{}".format(self.address, self.srclen,
self.scopelen)
+
@staticmethod
def from_text(text):
"""Convert a string into a `dns.edns.ECSOption`
try:
scope = int(scope)
except ValueError:
- raise ValueError('invalid scope "{}": scope must be an integer'.format(scope))
+ raise ValueError('invalid scope ' +
+ '"{}": scope must be an integer'.format(scope))
try:
srclen = int(srclen)
except ValueError:
- raise ValueError('invalid srclen "{}": srclen must be an integer'.format(srclen))
+ raise ValueError('invalid srclen ' +
+ '"{}": srclen must be an integer'.format(srclen))
return ECSOption(address, srclen, scope)
def to_wire(self, file):
@classmethod
def from_wire(cls, otype, wire, cur, olen):
- family, src, scope = struct.unpack('!HBB', wire[cur:cur+4])
+ family, src, scope = struct.unpack('!HBB', wire[cur:cur + 4])
cur += 4
- addrlen = int(math.ceil(src/8.0))
+ addrlen = int(math.ceil(src / 8.0))
if family == 1:
pad = 4 - addrlen
- addr = dns.ipv4.inet_ntoa(wire[cur:cur+addrlen] + b'\x00' * pad)
+ addr = dns.ipv4.inet_ntoa(wire[cur:cur + addrlen] + b'\x00' * pad)
elif family == 2:
pad = 16 - addrlen
- addr = dns.ipv6.inet_ntoa(wire[cur:cur+addrlen] + b'\x00' * pad)
+ addr = dns.ipv6.inet_ntoa(wire[cur:cur + addrlen] + b'\x00' * pad)
else:
raise ValueError('unsupported family')
try:
AF_INET6 = socket.AF_INET6
except AttributeError:
- AF_INET6 = 9999 # type: ignore
+ AF_INET6 = 9999 # type: ignore
def inet_pton(family, text):
try:
dns.ipv6.inet_aton(text, True)
return AF_INET6
- except:
+ except Exception:
raise ValueError
try:
b = [int(part) for part in parts]
return struct.pack('BBBB', *b)
- except:
+ except Exception:
raise dns.exception.SyntaxError
i = 0
l = len(hex)
while i < l:
- chunk = hex[i : i + 4].decode()
+ chunk = hex[i:i + 4].decode()
# strip leading zeros. we do this with an re instead of
# with lstrip() because lstrip() didn't support chars until
# python 2.2.2
m = _leading_zero.match(chunk)
- if not m is None:
+ if m is not None:
chunk = m.group(1)
chunks.append(chunk)
i += 4
# Get rid of the icky dot-quad syntax if we have it.
#
m = _v4_ending.match(text)
- if not m is None:
+ if m is not None:
b = dns.ipv4.inet_aton(m.group(2))
text = (u"{}:{:02x}{:02x}:{:02x}{:02x}".format(m.group(1).decode(),
b[0], b[1], b[2],
# turn '<whatever>::' into '<whatever>:'
#
m = _colon_colon_start.match(text)
- if not m is None:
+ if m is not None:
text = text[1:]
else:
m = _colon_colon_end.match(text)
- if not m is None:
+ if m is not None:
text = text[:-1]
#
# Now canonicalize into 8 chunks of 4 hex digits each
except ImportError:
have_doh = False
-
try:
import ssl
except ImportError:
class ssl(object): # type: ignore
+
class WantReadException(Exception):
pass
+
class WantWriteException(Exception):
pass
+
class SSLSocket(object):
pass
+
def create_default_context(self, *args, **kwargs):
raise Exception('no ssl support')
})
response = session.post(url, headers=headers, data=wire,
stream=True, timeout=timeout,
- verify=verify)
+ verify=verify)
else:
wire = base64.urlsafe_b64encode(wire).decode('utf-8').strip("=")
url += "?dns={}".format(wire)
(l,) = struct.unpack("!H", ldata)
wire = _net_read(s, l, mexpiration)
is_ixfr = (rdtype == dns.rdatatype.IXFR)
- r = dns.message.from_wire(wire, keyring=q.keyring, request_mac=q.mac,
- xfr=True, origin=origin, tsig_ctx=tsig_ctx,
+ r = dns.message.from_wire(wire, keyring=q.keyring,
+ request_mac=q.mac, xfr=True,
+ origin=origin, tsig_ctx=tsig_ctx,
multi=True, first=first,
one_rr_per_rrset=is_ixfr)
rcode = r.rcode()
delete_mode = not delete_mode
#
# If this SOA RRset is equal to the first we saw then we're
- # finished. If this is an IXFR we also check that we're seeing
- # the record in the expected part of the response.
+ # finished. If this is an IXFR we also check that we're
+ # seeing the record in the expected part of the response.
#
if rrset == soa_rrset and \
(rdtype == dns.rdatatype.AXFR or
try:
import ssl
except ImportError:
- class ssl(object): # type: ignore
+ class ssl(object): # type: ignore
SSLContext : Dict = {}
have_doh: bool
return True
return False
-
-def register_type(rdtype, rdtype_text, is_singleton=False): # pylint: disable=redefined-outer-name
+# pylint: disable=redefined-outer-name
+def register_type(rdtype, rdtype_text, is_singleton=False):
"""Dynamically register an rdatatype.
*rdtype*, an ``int``, the rdatatype to register.
"""AVC record"""
- # See: U{http://www.iana.org/assignments/dns-parameters/AVC/avc-completed-template}
+ # See: IANA dns parameters for AVC
def to_text(self, origin=None, relativize=True, **kw):
return '{} {} {}'.format(self.latitude.decode(),
- self.longitude.decode(),
- self.altitude.decode())
+ self.longitude.decode(),
+ self.altitude.decode())
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
def to_text(self, origin=None, relativize=True, **kw):
if self.subaddress:
return '"{}" "{}"'.format(dns.rdata._escapify(self.address),
- dns.rdata._escapify(self.subaddress))
+ dns.rdata._escapify(self.subaddress))
else:
return '"%s"' % dns.rdata._escapify(self.address)
what *= -1
else:
sign = 1
- what = round(what * 3600000) # pylint: disable=round-builtin
+ what = round(what * 3600000) # pylint: disable=round-builtin
degrees = int(what // 3600000)
what -= degrees * 3600000
minutes = int(what // 60000)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
- (domain, cused) = dns.name.from_wire(wire[: current + rdlen-2],
- current)
+ (domain, cused) = dns.name.from_wire(wire[:current + rdlen - 2],
+ current)
current += cused
- (address,) = struct.unpack('!H', wire[current: current + 2])
- if cused+2 != rdlen:
+ (address,) = struct.unpack('!H', wire[current:current + 2])
+ if cused + 2 != rdlen:
raise dns.exception.FormError
if origin is not None:
domain = domain.relativize(origin)
"""IPSECKEY record"""
- #see: RFC 4025
+ # see: RFC 4025
__slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
@property
def canonical_name(self):
"""Return the unresolved canonical name."""
- if not 'qnames' in self.kwargs:
+ if 'qnames' not in self.kwargs:
raise TypeError("parametrized exception required")
IN = dns.rdataclass.IN
CNAME = dns.rdatatype.CNAME
all_nxdomain = True
nxdomain_responses = {}
start = time.time()
- _qname = None # make pylint happy
+ _qname = None # make pylint happy
for _qname in qnames_to_try:
if self.cache:
answer = self.cache.get((_qname, rdtype, rdclass))
else:
tcp_attempt = tcp
if tcp:
- response = dns.query.tcp(request, nameserver,
- timeout=timeout,
- port=port,
- source=source,
- source_port=\
- source_port)
+ response = \
+ dns.query.tcp(request, nameserver,
+ timeout=timeout,
+ port=port,
+ source=source,
+ source_port=source_port)
else:
try:
- response = dns.query.udp(request,
- nameserver,
- timeout=timeout,
- port=port,
- source=source,
- source_port=\
- source_port)
+ response = \
+ dns.query.udp(request,
+ nameserver,
+ timeout=timeout,
+ port=port,
+ source=source,
+ source_port=source_port)
except dns.message.Truncated:
# Response truncated; retry with TCP.
tcp_attempt = True
``list``.
"""
if isinstance(nameservers, list):
- self._nameservers = nameservers # pylint: disable=attribute-defined-outside-init
+ self._nameservers = nameservers
else:
raise ValueError('nameservers must be a list'
' (not a {})'.format(type(nameservers)))
return _original_getaddrinfo(host, service, family, socktype,
proto, flags)
try:
- af = dns.inet.af_for_address(host)
+ # We don't care about the result of af_for_address(), we're just
+ # calling it so it raises an exception if host is not an IPv4 or
+ # IPv6 address.
+ dns.inet.af_for_address(host)
return _original_getaddrinfo(host, service, family, socktype,
proto, flags)
except Exception:
def __iter__(self):
return self.nodes.__iter__()
- def iterkeys(self):
- return self.nodes.keys() # pylint: disable=dict-keys-not-iterating
-
def keys(self):
- return self.nodes.keys() # pylint: disable=dict-keys-not-iterating
-
- def itervalues(self):
- return self.nodes.values() # pylint: disable=dict-values-not-iterating
+ return self.nodes.keys() # pylint: disable=dict-keys-not-iterating
def values(self):
- return self.nodes.values() # pylint: disable=dict-values-not-iterating
+ return self.nodes.values() # pylint: disable=dict-values-not-iterating
def items(self):
- return self.nodes.items() # pylint: disable=dict-items-not-iterating
-
- iteritems = items
+ return self.nodes.items() # pylint: disable=dict-items-not-iterating
def get(self, key):
key = self._validate_name(key)
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = dns.rdatatype.from_text(covers)
- for (name, node) in self.iteritems(): # pylint: disable=dict-iter-method
+ for (name, node) in self.items():
for rds in node:
if rdtype == dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
rdtype = dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = dns.rdatatype.from_text(covers)
- for (name, node) in self.iteritems(): # pylint: disable=dict-iter-method
+ for (name, node) in self.items():
for rds in node:
if rdtype == dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
names = list(self.keys())
names.sort()
else:
- names = self.iterkeys() # pylint: disable=dict-iter-method
+ names = self.keys()
for n in names:
l = self[n].to_text(n, origin=self.origin,
relativize=relativize)
# Type
try:
rdtype = dns.rdatatype.from_text(token.value)
- except:
+ except Exception:
raise dns.exception.SyntaxError(
"unknown rdatatype '%s'" % token.value)
n = self.zone.nodes.get(name)
# Catch and reraise.
(ty, va) = sys.exc_info()[:2]
raise va
- except:
+ except Exception:
# All exceptions that occur in the processing of rdata
# are treated as syntax errors. This is not strictly
# correct, but it is correct almost all of the time.
token = self.tok.get()
if not token.is_identifier():
raise dns.exception.SyntaxError
- except:
+ except Exception:
raise dns.exception.SyntaxError
# lhs (required)
token = self.tok.get()
if not token.is_identifier():
raise dns.exception.SyntaxError
- except:
+ except Exception:
raise dns.exception.SyntaxError
# TTL
# lhs (required)
try:
rhs = token.value
- except:
+ except Exception:
raise dns.exception.SyntaxError
lmod, lsign, loffset, lwidth, lbase = self._parse_modify(lhs)
# Catch and reraise.
(ty, va) = sys.exc_info()[:2]
raise va
- except:
+ except Exception:
# All exceptions that occur in the processing of rdata
# are treated as syntax errors. This is not strictly
# correct, but it is correct almost all of the time.