_network_available = False
+# Probe for IPv4 and IPv6
+query_addresses = []
+for (af, address) in ((socket.AF_INET, '8.8.8.8'),
+ (socket.AF_INET6, '2001:4860:4860::8888')):
+ try:
+ with socket.socket(af, socket.SOCK_DGRAM) as s:
+ # Connecting a UDP socket is supposed to return ENETUNREACH if
+ # no route to the network is present.
+ s.connect((address, 53))
+ query_addresses.append(address)
+ except Exception:
+ pass
+
+
@unittest.skipIf(not _network_available, "Internet not reachable")
class AsyncTests(unittest.TestCase):
self.assertRaises(dns.resolver.NotAbsolute, bad)
def testQueryUDP(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.udp(q, '8.8.8.8')
- response = self.async_run(run)
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.udp(q, address)
+ response = self.async_run(run)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
def testQueryUDPWithSocket(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- async with await self.backend.make_socket(socket.AF_INET,
- socket.SOCK_DGRAM) as s:
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.udp(q, '8.8.8.8', sock=s)
- response = self.async_run(run)
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ async with await self.backend.make_socket(
+ dns.inet.af_for_address(address),
+ socket.SOCK_DGRAM) as s:
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.udp(q, address, sock=s)
+ response = self.async_run(run)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
def testQueryTCP(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.tcp(q, '8.8.8.8')
- response = self.async_run(run)
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.tcp(q, address)
+ response = self.async_run(run)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
def testQueryTCPWithSocket(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- async with await self.backend.make_socket(socket.AF_INET,
- socket.SOCK_STREAM, 0,
- None,
- ('8.8.8.8', 53)) as s:
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.tcp(q, '8.8.8.8', sock=s)
- response = self.async_run(run)
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ async with await self.backend.make_socket(
+ dns.inet.af_for_address(address),
+ socket.SOCK_STREAM, 0,
+ None,
+ (address, 53)) as s:
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.tcp(q, address, sock=s)
+ response = self.async_run(run)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
@unittest.skipIf(not _ssl_available, "SSL not available")
def testQueryTLS(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.tls(q, '8.8.8.8')
- response = self.async_run(run)
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.tls(q, address)
+ response = self.async_run(run)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
@unittest.skipIf(not _ssl_available, "SSL not available")
def testQueryTLSWithSocket(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- async with await self.backend.make_socket(socket.AF_INET,
- socket.SOCK_STREAM, 0,
- None,
- ('8.8.8.8', 853), None,
- ssl_context, None) as s:
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.tls(q, '8.8.8.8', sock=s)
- response = self.async_run(run)
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ ssl_context = ssl.create_default_context()
+ ssl_context.check_hostname = False
+ async with await self.backend.make_socket(
+ dns.inet.af_for_address(address),
+ socket.SOCK_STREAM, 0,
+ None,
+ (address, 853), None,
+ ssl_context, None) as s:
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.tls(q, '8.8.8.8', sock=s)
+ response = self.async_run(run)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
def testQueryUDPFallback(self):
- qname = dns.name.from_text('.')
- async def run():
- q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- return await dns.asyncquery.udp_with_fallback(q, '8.8.8.8')
- (_, tcp) = self.async_run(run)
- self.assertTrue(tcp)
+ for address in query_addresses:
+ qname = dns.name.from_text('.')
+ async def run():
+ q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+ return await dns.asyncquery.udp_with_fallback(q, address)
+ (_, tcp) = self.async_run(run)
+ self.assertTrue(tcp)
def testQueryUDPFallbackNoFallback(self):
- qname = dns.name.from_text('dns.google.')
- async def run():
- q = dns.message.make_query(qname, dns.rdatatype.A)
- return await dns.asyncquery.udp_with_fallback(q, '8.8.8.8')
- (_, tcp) = self.async_run(run)
- self.assertFalse(tcp)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ async def run():
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ return await dns.asyncquery.udp_with_fallback(q, address)
+ (_, tcp) = self.async_run(run)
+ self.assertFalse(tcp)
try:
import trio
have_ssl = False
import dns.exception
+import dns.inet
import dns.message
import dns.name
import dns.rdataclass
class Server(object):
pass
+# Probe for IPv4 and IPv6
+query_addresses = []
+for (af, address) in ((socket.AF_INET, '8.8.8.8'),
+ (socket.AF_INET6, '2001:4860:4860::8888')):
+ try:
+ with socket.socket(af, socket.SOCK_DGRAM) as s:
+ # Connecting a UDP socket is supposed to return ENETUNREACH if
+ # no route to the network is present.
+ s.connect((address, 53))
+ query_addresses.append(address)
+ except Exception:
+ pass
+
@unittest.skipIf(not _network_available, "Internet not reachable")
class QueryTests(unittest.TestCase):
def testQueryUDP(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.udp(q, '8.8.8.8')
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
-
- def testQueryUDPWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
- s.setblocking(0)
+ for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.udp(q, '8.8.8.8', sock=s)
+ response = dns.query.udp(q, address)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
self.assertTrue('8.8.8.8' in seen)
self.assertTrue('8.8.4.4' in seen)
- def testQueryTCP(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tcp(q, '8.8.8.8')
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
+ def testQueryUDPWithSocket(self):
+ for address in query_addresses:
+ with socket.socket(dns.inet.af_for_address(address),
+ socket.SOCK_DGRAM) as s:
+ s.setblocking(0)
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ response = dns.query.udp(q, address, sock=s)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
- def testQueryTCPWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.connect(('8.8.8.8', 53))
- s.setblocking(0)
+ def testQueryTCP(self):
+ for address in query_addresses:
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tcp(q, None, sock=s)
+ response = dns.query.tcp(q, address)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
self.assertTrue('8.8.8.8' in seen)
self.assertTrue('8.8.4.4' in seen)
- def testQueryTLS(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tls(q, '8.8.8.8')
- rrs = response.get_rrset(response.answer, qname,
- dns.rdataclass.IN, dns.rdatatype.A)
- self.assertTrue(rrs is not None)
- seen = set([rdata.address for rdata in rrs])
- self.assertTrue('8.8.8.8' in seen)
- self.assertTrue('8.8.4.4' in seen)
-
- @unittest.skipUnless(have_ssl, "No SSL support")
- def testQueryTLSWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as base_s:
- base_s.connect(('8.8.8.8', 853))
- ctx = ssl.create_default_context()
- with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
+ def testQueryTCPWithSocket(self):
+ for address in query_addresses:
+ with socket.socket(dns.inet.af_for_address(address),
+ socket.SOCK_STREAM) as s:
+ ll = dns.inet.low_level_address_tuple((address, 53))
+ s.connect(ll)
s.setblocking(0)
qname = dns.name.from_text('dns.google.')
q = dns.message.make_query(qname, dns.rdatatype.A)
- response = dns.query.tls(q, None, sock=s)
+ response = dns.query.tcp(q, None, sock=s)
rrs = response.get_rrset(response.answer, qname,
dns.rdataclass.IN, dns.rdatatype.A)
self.assertTrue(rrs is not None)
self.assertTrue('8.8.8.8' in seen)
self.assertTrue('8.8.4.4' in seen)
+ def testQueryTLS(self):
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ response = dns.query.tls(q, address)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
+
+ @unittest.skipUnless(have_ssl, "No SSL support")
+ def testQueryTLSWithSocket(self):
+ for address in query_addresses:
+ with socket.socket(dns.inet.af_for_address(address),
+ socket.SOCK_STREAM) as base_s:
+ ll = dns.inet.low_level_address_tuple((address, 853))
+ base_s.connect(ll)
+ ctx = ssl.create_default_context()
+ with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
+ s.setblocking(0)
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ response = dns.query.tls(q, None, sock=s)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('8.8.8.8' in seen)
+ self.assertTrue('8.8.4.4' in seen)
+
def testQueryUDPFallback(self):
- qname = dns.name.from_text('.')
- q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8')
- self.assertTrue(tcp)
+ for address in query_addresses:
+ qname = dns.name.from_text('.')
+ q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+ (_, tcp) = dns.query.udp_with_fallback(q, address)
+ self.assertTrue(tcp)
def testQueryUDPFallbackWithSocket(self):
- with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_s:
- udp_s.setblocking(0)
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_s:
- tcp_s.connect(('8.8.8.8', 53))
- tcp_s.setblocking(0)
- qname = dns.name.from_text('.')
- q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
- (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8',
- udp_sock=udp_s,
- tcp_sock=tcp_s)
- self.assertTrue(tcp)
+ for address in query_addresses:
+ af = dns.inet.af_for_address(address)
+ with socket.socket(af, socket.SOCK_DGRAM) as udp_s:
+ udp_s.setblocking(0)
+ with socket.socket(af, socket.SOCK_STREAM) as tcp_s:
+ ll = dns.inet.low_level_address_tuple((address, 53))
+ tcp_s.connect(ll)
+ tcp_s.setblocking(0)
+ qname = dns.name.from_text('.')
+ q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+ (_, tcp) = dns.query.udp_with_fallback(q, address,
+ udp_sock=udp_s,
+ tcp_sock=tcp_s)
+ self.assertTrue(tcp)
def testQueryUDPFallbackNoFallback(self):
- qname = dns.name.from_text('dns.google.')
- q = dns.message.make_query(qname, dns.rdatatype.A)
- (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8')
- self.assertFalse(tcp)
+ for address in query_addresses:
+ qname = dns.name.from_text('dns.google.')
+ q = dns.message.make_query(qname, dns.rdatatype.A)
+ (_, tcp) = dns.query.udp_with_fallback(q, address)
+ self.assertFalse(tcp)
axfr_zone = '''