dns.reversename.from_address(ipaddr), *args, **modified_kwargs
)
-
async def resolve_name(
self,
name: Union[dns.name.Name, str],
family: int = socket.AF_UNSPEC,
- **kwargs: Any
+ **kwargs: Any,
) -> dns.resolver.HostAnswers:
"""Use an asynchronous resolver to query for address records.
elif family != socket.AF_UNSPEC:
raise NotImplementedError(f"unknown address family {family}")
- raise_on_no_answer = modified_kwargs.pop('raise_on_no_answer', True)
- lifetime = modified_kwargs.pop('lifetime', None)
+ raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True)
+ lifetime = modified_kwargs.pop("lifetime", None)
start = time.time()
- v6 = await self.resolve(name, dns.rdatatype.AAAA,
- raise_on_no_answer=False,
- lifetime=self._compute_timeout(start, lifetime),
- **modified_kwargs)
+ v6 = await self.resolve(
+ name,
+ dns.rdatatype.AAAA,
+ raise_on_no_answer=False,
+ lifetime=self._compute_timeout(start, lifetime),
+ **modified_kwargs,
+ )
# Note that setting name ensures we query the same name
# for A as we did for AAAA. (This is just in case search lists
# are active by default in the resolver configuration and
# we might be talking to a server that says NXDOMAIN when it
# wants to say NOERROR no data.
name = v6.qname
- v4 = await self.resolve(name, dns.rdatatype.A,
- raise_on_no_answer=False,
- lifetime=self._compute_timeout(start, lifetime),
- **modified_kwargs)
+ v4 = await self.resolve(
+ name,
+ dns.rdatatype.A,
+ raise_on_no_answer=False,
+ lifetime=self._compute_timeout(start, lifetime),
+ **modified_kwargs,
+ )
answers = dns.resolver.HostAnswers.make(
- v6=v6,
- v4=v4,
- add_empty=not raise_on_no_answer
+ v6=v6, v4=v4, add_empty=not raise_on_no_answer
)
if not answers:
raise NoAnswer(response=v6.response)
return answers
-
# pylint: disable=redefined-outer-name
async def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
async def resolve_name(
- name: Union[dns.name.Name, str],
- family: int = socket.AF_UNSPEC,
- **kwargs: Any
+ name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any
) -> dns.resolver.HostAnswers:
"""Use a resolver to asynchronously query for address records.
class EntropyPool:
-
# This is an entropy pool for Python implementations that do not
# have a working SystemRandom. I'm not sure there are any, but
# leaving this code doesn't hurt anything as the library code
s.write("payload %d\n" % self.payload)
for opt in self.options:
s.write("option %s\n" % opt.to_text())
- for (name, which) in self._section_enum.__members__.items():
+ for name, which in self._section_enum.__members__.items():
s.write(f";{name}\n")
for rrset in self.section_from_number(which):
s.write(rrset.to_text(origin, relativize, **kw))
or self.rdclass != other.rdclass
or self.rdtype != other.rdtype
):
-
return NotImplemented
return self._cmp(other) < 0
@rtype: dict"""
keyring = {}
- for (name, value) in textring.items():
+ for name, value in textring.items():
kname = dns.name.from_text(name)
if isinstance(value, str):
keyring[kname] = dns.tsig.Key(kname, value).secret
def b64encode(secret):
return base64.encodebytes(secret).decode().rstrip()
- for (name, key) in keyring.items():
+ for name, key in keyring.items():
tname = name.to_text()
if isinstance(key, bytes):
textring[tname] = b64encode(key)
class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals]
-
# ignore the mypy error here as we mean to use a different enum
_section_enum = UpdateSection # type: ignore
import sys
if sys.platform == "win32":
-
from typing import Any
import dns.name
class DecoratorTestCase(unittest.TestCase):
-
immutable_module = dns._immutable_ctx
def make_classes(self):
self.assertFalse(hasattr(a, "b"))
def test_no_collateral_damage(self):
-
# A and B are immutable but not related. The magic that lets
# us write to immutable things while initializing B should not let
# B mess with A.
class NSEC3Hash(unittest.TestCase):
-
DATA = [
# Source: https://tools.ietf.org/html/rfc5155#appendix-A
("example", "aabbccdd", 12, "0p9mhaveqvm6t7vbl5lop2u3t2rp3tom", 1),
("255.255.255.255", b"\xff\xff\xff\xff"),
("0.0.0.0", b"\x00\x00\x00\x00"),
]
- for (t, b) in pairs:
+ for t, b in pairs:
b1 = aton4(t)
t1 = ntoa4(b1)
self.assertEqual(b1, b)
self.assertRaises(dns.exception.SyntaxError, bad)
def test_ptontop(self):
- for (af, a) in [
+ for af, a in [
(socket.AF_INET, "1.2.3.4"),
(socket.AF_INET6, "2001:db8:0:1:1:1:1:1"),
]:
self.assertEqual(dns.inet.inet_ntop(af, dns.inet.inet_pton(af, a)), a)
def test_isaddress(self):
- for (t, e) in [
+ for t, e in [
("1.2.3.4", True),
("2001:db8:0:1:1:1:1:1", True),
("hello world", False),
response = dns.message.make_response(request.message)
response.question = []
response.flags |= dns.flags.AA
- for (name, rdataset) in self.zone.iterate_rdatasets():
+ for name, rdataset in self.zone.iterate_rdatasets():
if rdataset.rdtype == dns.rdatatype.SOA and name == dns.name.empty:
continue
rrset = dns.rrset.RRset(
class RdTypeAndClassTestCase(unittest.TestCase):
-
# Classes
def test_class_meta1(self):
soa_name = zone.origin
soa = zone.find_rdataset(soa_name, "SOA")
add_rdataset(msg, soa_name, soa)
- for (name, rds) in zone.iterate_rdatasets():
+ for name, rds in zone.iterate_rdatasets():
if rds.rdtype == dns.rdatatype.SOA:
continue
add_rdataset(msg, name, rds)