From: Bob Halley Date: Tue, 16 Jun 2020 00:20:00 +0000 (-0700) Subject: test IPv4 and IPv6 according to availability X-Git-Tag: v2.0.0rc1~99 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=12e607a9878607df0590ab231d5e81040c1568c5;p=thirdparty%2Fdnspython.git test IPv4 and IPv6 according to availability --- diff --git a/tests/test_async.py b/tests/test_async.py index 5c263092..15c7eaa1 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -47,6 +47,20 @@ except socket.gaierror: _network_available = False +# Probe for IPv4 and IPv6 +query_addresses = [] +for (af, address) in ((socket.AF_INET, '8.8.8.8'), + (socket.AF_INET6, '2001:4860:4860::8888')): + try: + with socket.socket(af, socket.SOCK_DGRAM) as s: + # Connecting a UDP socket is supposed to return ENETUNREACH if + # no route to the network is present. + s.connect((address, 53)) + query_addresses.append(address) + except Exception: + pass + + @unittest.skipIf(not _network_available, "Internet not reachable") class AsyncTests(unittest.TestCase): @@ -112,113 +126,124 @@ class AsyncTests(unittest.TestCase): self.assertRaises(dns.resolver.NotAbsolute, bad) def testQueryUDP(self): - qname = dns.name.from_text('dns.google.') - async def run(): - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.udp(q, '8.8.8.8') - response = self.async_run(run) - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.udp(q, address) + response = self.async_run(run) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) def testQueryUDPWithSocket(self): - qname = dns.name.from_text('dns.google.') - async def run(): - async with await self.backend.make_socket(socket.AF_INET, - socket.SOCK_DGRAM) as s: - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.udp(q, '8.8.8.8', sock=s) - response = self.async_run(run) - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + async with await self.backend.make_socket( + dns.inet.af_for_address(address), + socket.SOCK_DGRAM) as s: + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.udp(q, address, sock=s) + response = self.async_run(run) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) def testQueryTCP(self): - qname = dns.name.from_text('dns.google.') - async def run(): - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.tcp(q, '8.8.8.8') - response = self.async_run(run) - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.tcp(q, address) + response = self.async_run(run) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) def testQueryTCPWithSocket(self): - qname = dns.name.from_text('dns.google.') - async def run(): - async with await self.backend.make_socket(socket.AF_INET, - socket.SOCK_STREAM, 0, - None, - ('8.8.8.8', 53)) as s: - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.tcp(q, '8.8.8.8', sock=s) - response = self.async_run(run) - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + async with await self.backend.make_socket( + dns.inet.af_for_address(address), + socket.SOCK_STREAM, 0, + None, + (address, 53)) as s: + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.tcp(q, address, sock=s) + response = self.async_run(run) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) @unittest.skipIf(not _ssl_available, "SSL not available") def testQueryTLS(self): - qname = dns.name.from_text('dns.google.') - async def run(): - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.tls(q, '8.8.8.8') - response = self.async_run(run) - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.tls(q, address) + response = self.async_run(run) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) @unittest.skipIf(not _ssl_available, "SSL not available") def testQueryTLSWithSocket(self): - qname = dns.name.from_text('dns.google.') - async def run(): - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - async with await self.backend.make_socket(socket.AF_INET, - socket.SOCK_STREAM, 0, - None, - ('8.8.8.8', 853), None, - ssl_context, None) as s: - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.tls(q, '8.8.8.8', sock=s) - response = self.async_run(run) - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + async with await self.backend.make_socket( + dns.inet.af_for_address(address), + socket.SOCK_STREAM, 0, + None, + (address, 853), None, + ssl_context, None) as s: + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.tls(q, '8.8.8.8', sock=s) + response = self.async_run(run) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) def testQueryUDPFallback(self): - qname = dns.name.from_text('.') - async def run(): - q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) - return await dns.asyncquery.udp_with_fallback(q, '8.8.8.8') - (_, tcp) = self.async_run(run) - self.assertTrue(tcp) + for address in query_addresses: + qname = dns.name.from_text('.') + async def run(): + q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) + return await dns.asyncquery.udp_with_fallback(q, address) + (_, tcp) = self.async_run(run) + self.assertTrue(tcp) def testQueryUDPFallbackNoFallback(self): - qname = dns.name.from_text('dns.google.') - async def run(): - q = dns.message.make_query(qname, dns.rdatatype.A) - return await dns.asyncquery.udp_with_fallback(q, '8.8.8.8') - (_, tcp) = self.async_run(run) - self.assertFalse(tcp) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + async def run(): + q = dns.message.make_query(qname, dns.rdatatype.A) + return await dns.asyncquery.udp_with_fallback(q, address) + (_, tcp) = self.async_run(run) + self.assertFalse(tcp) try: import trio diff --git a/tests/test_query.py b/tests/test_query.py index 90fee03f..df52dfe5 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -25,6 +25,7 @@ except Exception: have_ssl = False import dns.exception +import dns.inet import dns.message import dns.name import dns.rdataclass @@ -51,26 +52,27 @@ except ImportError: class Server(object): pass +# Probe for IPv4 and IPv6 +query_addresses = [] +for (af, address) in ((socket.AF_INET, '8.8.8.8'), + (socket.AF_INET6, '2001:4860:4860::8888')): + try: + with socket.socket(af, socket.SOCK_DGRAM) as s: + # Connecting a UDP socket is supposed to return ENETUNREACH if + # no route to the network is present. + s.connect((address, 53)) + query_addresses.append(address) + except Exception: + pass + @unittest.skipIf(not _network_available, "Internet not reachable") class QueryTests(unittest.TestCase): def testQueryUDP(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.udp(q, '8.8.8.8') - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) - - def testQueryUDPWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.setblocking(0) + for address in query_addresses: qname = dns.name.from_text('dns.google.') q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.udp(q, '8.8.8.8', sock=s) + response = dns.query.udp(q, address) rrs = response.get_rrset(response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A) self.assertTrue(rrs is not None) @@ -78,24 +80,26 @@ class QueryTests(unittest.TestCase): self.assertTrue('8.8.8.8' in seen) self.assertTrue('8.8.4.4' in seen) - def testQueryTCP(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tcp(q, '8.8.8.8') - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) + def testQueryUDPWithSocket(self): + for address in query_addresses: + with socket.socket(dns.inet.af_for_address(address), + socket.SOCK_DGRAM) as s: + s.setblocking(0) + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + response = dns.query.udp(q, address, sock=s) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) - def testQueryTCPWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect(('8.8.8.8', 53)) - s.setblocking(0) + def testQueryTCP(self): + for address in query_addresses: qname = dns.name.from_text('dns.google.') q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tcp(q, None, sock=s) + response = dns.query.tcp(q, address) rrs = response.get_rrset(response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A) self.assertTrue(rrs is not None) @@ -103,27 +107,16 @@ class QueryTests(unittest.TestCase): self.assertTrue('8.8.8.8' in seen) self.assertTrue('8.8.4.4' in seen) - def testQueryTLS(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tls(q, '8.8.8.8') - rrs = response.get_rrset(response.answer, qname, - dns.rdataclass.IN, dns.rdatatype.A) - self.assertTrue(rrs is not None) - seen = set([rdata.address for rdata in rrs]) - self.assertTrue('8.8.8.8' in seen) - self.assertTrue('8.8.4.4' in seen) - - @unittest.skipUnless(have_ssl, "No SSL support") - def testQueryTLSWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as base_s: - base_s.connect(('8.8.8.8', 853)) - ctx = ssl.create_default_context() - with ctx.wrap_socket(base_s, server_hostname='dns.google') as s: + def testQueryTCPWithSocket(self): + for address in query_addresses: + with socket.socket(dns.inet.af_for_address(address), + socket.SOCK_STREAM) as s: + ll = dns.inet.low_level_address_tuple((address, 53)) + s.connect(ll) s.setblocking(0) qname = dns.name.from_text('dns.google.') q = dns.message.make_query(qname, dns.rdatatype.A) - response = dns.query.tls(q, None, sock=s) + response = dns.query.tcp(q, None, sock=s) rrs = response.get_rrset(response.answer, qname, dns.rdataclass.IN, dns.rdatatype.A) self.assertTrue(rrs is not None) @@ -131,30 +124,67 @@ class QueryTests(unittest.TestCase): self.assertTrue('8.8.8.8' in seen) self.assertTrue('8.8.4.4' in seen) + def testQueryTLS(self): + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + response = dns.query.tls(q, address) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) + + @unittest.skipUnless(have_ssl, "No SSL support") + def testQueryTLSWithSocket(self): + for address in query_addresses: + with socket.socket(dns.inet.af_for_address(address), + socket.SOCK_STREAM) as base_s: + ll = dns.inet.low_level_address_tuple((address, 853)) + base_s.connect(ll) + ctx = ssl.create_default_context() + with ctx.wrap_socket(base_s, server_hostname='dns.google') as s: + s.setblocking(0) + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + response = dns.query.tls(q, None, sock=s) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('8.8.8.8' in seen) + self.assertTrue('8.8.4.4' in seen) + def testQueryUDPFallback(self): - qname = dns.name.from_text('.') - q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) - (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8') - self.assertTrue(tcp) + for address in query_addresses: + qname = dns.name.from_text('.') + q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) + (_, tcp) = dns.query.udp_with_fallback(q, address) + self.assertTrue(tcp) def testQueryUDPFallbackWithSocket(self): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_s: - udp_s.setblocking(0) - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_s: - tcp_s.connect(('8.8.8.8', 53)) - tcp_s.setblocking(0) - qname = dns.name.from_text('.') - q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) - (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8', - udp_sock=udp_s, - tcp_sock=tcp_s) - self.assertTrue(tcp) + for address in query_addresses: + af = dns.inet.af_for_address(address) + with socket.socket(af, socket.SOCK_DGRAM) as udp_s: + udp_s.setblocking(0) + with socket.socket(af, socket.SOCK_STREAM) as tcp_s: + ll = dns.inet.low_level_address_tuple((address, 53)) + tcp_s.connect(ll) + tcp_s.setblocking(0) + qname = dns.name.from_text('.') + q = dns.message.make_query(qname, dns.rdatatype.DNSKEY) + (_, tcp) = dns.query.udp_with_fallback(q, address, + udp_sock=udp_s, + tcp_sock=tcp_s) + self.assertTrue(tcp) def testQueryUDPFallbackNoFallback(self): - qname = dns.name.from_text('dns.google.') - q = dns.message.make_query(qname, dns.rdatatype.A) - (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8') - self.assertFalse(tcp) + for address in query_addresses: + qname = dns.name.from_text('dns.google.') + q = dns.message.make_query(qname, dns.rdatatype.A) + (_, tcp) = dns.query.udp_with_fallback(q, address) + self.assertFalse(tcp) axfr_zone = '''