]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
test IPv4 and IPv6 according to availability
authorBob Halley <halley@dnspython.org>
Tue, 16 Jun 2020 00:20:00 +0000 (17:20 -0700)
committerBob Halley <halley@dnspython.org>
Tue, 16 Jun 2020 00:20:00 +0000 (17:20 -0700)
tests/test_async.py
tests/test_query.py

index 5c26309289e8d16948254c1f92ec584258f18d14..15c7eaa1ec789144123789ce51a7bdb898373c2a 100644 (file)
@@ -47,6 +47,20 @@ except socket.gaierror:
     _network_available = False
 
 
+# Probe for IPv4 and IPv6
+query_addresses = []
+for (af, address) in ((socket.AF_INET, '8.8.8.8'),
+                      (socket.AF_INET6, '2001:4860:4860::8888')):
+    try:
+        with socket.socket(af, socket.SOCK_DGRAM) as s:
+            # Connecting a UDP socket is supposed to return ENETUNREACH if
+            # no route to the network is present.
+            s.connect((address, 53))
+        query_addresses.append(address)
+    except Exception:
+        pass
+
+
 @unittest.skipIf(not _network_available, "Internet not reachable")
 class AsyncTests(unittest.TestCase):
 
@@ -112,113 +126,124 @@ class AsyncTests(unittest.TestCase):
         self.assertRaises(dns.resolver.NotAbsolute, bad)
 
     def testQueryUDP(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            q = dns.message.make_query(qname, dns.rdatatype.A)
-            return await dns.asyncquery.udp(q, '8.8.8.8')
-        response = self.async_run(run)
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                q = dns.message.make_query(qname, dns.rdatatype.A)
+                return await dns.asyncquery.udp(q, address)
+            response = self.async_run(run)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
 
     def testQueryUDPWithSocket(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            async with await self.backend.make_socket(socket.AF_INET,
-                                                      socket.SOCK_DGRAM) as s:
-                q = dns.message.make_query(qname, dns.rdatatype.A)
-                return await dns.asyncquery.udp(q, '8.8.8.8', sock=s)
-        response = self.async_run(run)
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                async with await self.backend.make_socket(
+                        dns.inet.af_for_address(address),
+                        socket.SOCK_DGRAM) as s:
+                    q = dns.message.make_query(qname, dns.rdatatype.A)
+                    return await dns.asyncquery.udp(q, address, sock=s)
+            response = self.async_run(run)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
 
     def testQueryTCP(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            q = dns.message.make_query(qname, dns.rdatatype.A)
-            return await dns.asyncquery.tcp(q, '8.8.8.8')
-        response = self.async_run(run)
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                q = dns.message.make_query(qname, dns.rdatatype.A)
+                return await dns.asyncquery.tcp(q, address)
+            response = self.async_run(run)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
 
     def testQueryTCPWithSocket(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            async with await self.backend.make_socket(socket.AF_INET,
-                                                      socket.SOCK_STREAM, 0,
-                                                      None,
-                                                      ('8.8.8.8', 53)) as s:
-                q = dns.message.make_query(qname, dns.rdatatype.A)
-                return await dns.asyncquery.tcp(q, '8.8.8.8', sock=s)
-        response = self.async_run(run)
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                async with await self.backend.make_socket(
+                        dns.inet.af_for_address(address),
+                        socket.SOCK_STREAM, 0,
+                        None,
+                        (address, 53)) as s:
+                    q = dns.message.make_query(qname, dns.rdatatype.A)
+                    return await dns.asyncquery.tcp(q, address, sock=s)
+            response = self.async_run(run)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
 
     @unittest.skipIf(not _ssl_available, "SSL not available")
     def testQueryTLS(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            q = dns.message.make_query(qname, dns.rdatatype.A)
-            return await dns.asyncquery.tls(q, '8.8.8.8')
-        response = self.async_run(run)
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                q = dns.message.make_query(qname, dns.rdatatype.A)
+                return await dns.asyncquery.tls(q, address)
+            response = self.async_run(run)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
 
     @unittest.skipIf(not _ssl_available, "SSL not available")
     def testQueryTLSWithSocket(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            ssl_context = ssl.create_default_context()
-            ssl_context.check_hostname = False
-            async with await self.backend.make_socket(socket.AF_INET,
-                                                      socket.SOCK_STREAM, 0,
-                                                      None,
-                                                      ('8.8.8.8', 853), None,
-                                                      ssl_context, None) as s:
-                q = dns.message.make_query(qname, dns.rdatatype.A)
-                return await dns.asyncquery.tls(q, '8.8.8.8', sock=s)
-        response = self.async_run(run)
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                ssl_context = ssl.create_default_context()
+                ssl_context.check_hostname = False
+                async with await self.backend.make_socket(
+                        dns.inet.af_for_address(address),
+                        socket.SOCK_STREAM, 0,
+                        None,
+                        (address, 853), None,
+                        ssl_context, None) as s:
+                    q = dns.message.make_query(qname, dns.rdatatype.A)
+                    return await dns.asyncquery.tls(q, '8.8.8.8', sock=s)
+            response = self.async_run(run)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
 
     def testQueryUDPFallback(self):
-        qname = dns.name.from_text('.')
-        async def run():
-            q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
-            return await dns.asyncquery.udp_with_fallback(q, '8.8.8.8')
-        (_, tcp) = self.async_run(run)
-        self.assertTrue(tcp)
+        for address in query_addresses:
+            qname = dns.name.from_text('.')
+            async def run():
+                q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+                return await dns.asyncquery.udp_with_fallback(q, address)
+            (_, tcp) = self.async_run(run)
+            self.assertTrue(tcp)
 
     def testQueryUDPFallbackNoFallback(self):
-        qname = dns.name.from_text('dns.google.')
-        async def run():
-            q = dns.message.make_query(qname, dns.rdatatype.A)
-            return await dns.asyncquery.udp_with_fallback(q, '8.8.8.8')
-        (_, tcp) = self.async_run(run)
-        self.assertFalse(tcp)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            async def run():
+                q = dns.message.make_query(qname, dns.rdatatype.A)
+                return await dns.asyncquery.udp_with_fallback(q, address)
+            (_, tcp) = self.async_run(run)
+            self.assertFalse(tcp)
 
 try:
     import trio
index 90fee03f9d235838115d70bf50bfe343ada0f4e5..df52dfe5a4fee3058769ce4b72a9e30c81d49b6a 100644 (file)
@@ -25,6 +25,7 @@ except Exception:
     have_ssl = False
 
 import dns.exception
+import dns.inet
 import dns.message
 import dns.name
 import dns.rdataclass
@@ -51,26 +52,27 @@ except ImportError:
     class Server(object):
         pass
 
+# Probe for IPv4 and IPv6
+query_addresses = []
+for (af, address) in ((socket.AF_INET, '8.8.8.8'),
+                      (socket.AF_INET6, '2001:4860:4860::8888')):
+    try:
+        with socket.socket(af, socket.SOCK_DGRAM) as s:
+            # Connecting a UDP socket is supposed to return ENETUNREACH if
+            # no route to the network is present.
+            s.connect((address, 53))
+        query_addresses.append(address)
+    except Exception:
+        pass
+
 @unittest.skipIf(not _network_available, "Internet not reachable")
 class QueryTests(unittest.TestCase):
 
     def testQueryUDP(self):
-        qname = dns.name.from_text('dns.google.')
-        q = dns.message.make_query(qname, dns.rdatatype.A)
-        response = dns.query.udp(q, '8.8.8.8')
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
-
-    def testQueryUDPWithSocket(self):
-        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
-            s.setblocking(0)
+        for address in query_addresses:
             qname = dns.name.from_text('dns.google.')
             q = dns.message.make_query(qname, dns.rdatatype.A)
-            response = dns.query.udp(q, '8.8.8.8', sock=s)
+            response = dns.query.udp(q, address)
             rrs = response.get_rrset(response.answer, qname,
                                      dns.rdataclass.IN, dns.rdatatype.A)
             self.assertTrue(rrs is not None)
@@ -78,24 +80,26 @@ class QueryTests(unittest.TestCase):
             self.assertTrue('8.8.8.8' in seen)
             self.assertTrue('8.8.4.4' in seen)
 
-    def testQueryTCP(self):
-        qname = dns.name.from_text('dns.google.')
-        q = dns.message.make_query(qname, dns.rdatatype.A)
-        response = dns.query.tcp(q, '8.8.8.8')
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
+    def testQueryUDPWithSocket(self):
+        for address in query_addresses:
+            with socket.socket(dns.inet.af_for_address(address),
+                               socket.SOCK_DGRAM) as s:
+                s.setblocking(0)
+                qname = dns.name.from_text('dns.google.')
+                q = dns.message.make_query(qname, dns.rdatatype.A)
+                response = dns.query.udp(q, address, sock=s)
+                rrs = response.get_rrset(response.answer, qname,
+                                         dns.rdataclass.IN, dns.rdatatype.A)
+                self.assertTrue(rrs is not None)
+                seen = set([rdata.address for rdata in rrs])
+                self.assertTrue('8.8.8.8' in seen)
+                self.assertTrue('8.8.4.4' in seen)
 
-    def testQueryTCPWithSocket(self):
-        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
-            s.connect(('8.8.8.8', 53))
-            s.setblocking(0)
+    def testQueryTCP(self):
+        for address in query_addresses:
             qname = dns.name.from_text('dns.google.')
             q = dns.message.make_query(qname, dns.rdatatype.A)
-            response = dns.query.tcp(q, None, sock=s)
+            response = dns.query.tcp(q, address)
             rrs = response.get_rrset(response.answer, qname,
                                      dns.rdataclass.IN, dns.rdatatype.A)
             self.assertTrue(rrs is not None)
@@ -103,27 +107,16 @@ class QueryTests(unittest.TestCase):
             self.assertTrue('8.8.8.8' in seen)
             self.assertTrue('8.8.4.4' in seen)
 
-    def testQueryTLS(self):
-        qname = dns.name.from_text('dns.google.')
-        q = dns.message.make_query(qname, dns.rdatatype.A)
-        response = dns.query.tls(q, '8.8.8.8')
-        rrs = response.get_rrset(response.answer, qname,
-                                 dns.rdataclass.IN, dns.rdatatype.A)
-        self.assertTrue(rrs is not None)
-        seen = set([rdata.address for rdata in rrs])
-        self.assertTrue('8.8.8.8' in seen)
-        self.assertTrue('8.8.4.4' in seen)
-
-    @unittest.skipUnless(have_ssl, "No SSL support")
-    def testQueryTLSWithSocket(self):
-        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as base_s:
-            base_s.connect(('8.8.8.8', 853))
-            ctx = ssl.create_default_context()
-            with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
+    def testQueryTCPWithSocket(self):
+        for address in query_addresses:
+            with socket.socket(dns.inet.af_for_address(address),
+                               socket.SOCK_STREAM) as s:
+                ll = dns.inet.low_level_address_tuple((address, 53))
+                s.connect(ll)
                 s.setblocking(0)
                 qname = dns.name.from_text('dns.google.')
                 q = dns.message.make_query(qname, dns.rdatatype.A)
-                response = dns.query.tls(q, None, sock=s)
+                response = dns.query.tcp(q, None, sock=s)
                 rrs = response.get_rrset(response.answer, qname,
                                          dns.rdataclass.IN, dns.rdatatype.A)
                 self.assertTrue(rrs is not None)
@@ -131,30 +124,67 @@ class QueryTests(unittest.TestCase):
                 self.assertTrue('8.8.8.8' in seen)
                 self.assertTrue('8.8.4.4' in seen)
 
+    def testQueryTLS(self):
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            q = dns.message.make_query(qname, dns.rdatatype.A)
+            response = dns.query.tls(q, address)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('8.8.8.8' in seen)
+            self.assertTrue('8.8.4.4' in seen)
+
+    @unittest.skipUnless(have_ssl, "No SSL support")
+    def testQueryTLSWithSocket(self):
+        for address in query_addresses:
+            with socket.socket(dns.inet.af_for_address(address),
+                               socket.SOCK_STREAM) as base_s:
+                ll = dns.inet.low_level_address_tuple((address, 853))
+                base_s.connect(ll)
+                ctx = ssl.create_default_context()
+                with ctx.wrap_socket(base_s, server_hostname='dns.google') as s:
+                    s.setblocking(0)
+                    qname = dns.name.from_text('dns.google.')
+                    q = dns.message.make_query(qname, dns.rdatatype.A)
+                    response = dns.query.tls(q, None, sock=s)
+                    rrs = response.get_rrset(response.answer, qname,
+                                             dns.rdataclass.IN, dns.rdatatype.A)
+                    self.assertTrue(rrs is not None)
+                    seen = set([rdata.address for rdata in rrs])
+                    self.assertTrue('8.8.8.8' in seen)
+                    self.assertTrue('8.8.4.4' in seen)
+
     def testQueryUDPFallback(self):
-        qname = dns.name.from_text('.')
-        q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
-        (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8')
-        self.assertTrue(tcp)
+        for address in query_addresses:
+            qname = dns.name.from_text('.')
+            q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+            (_, tcp) = dns.query.udp_with_fallback(q, address)
+            self.assertTrue(tcp)
 
     def testQueryUDPFallbackWithSocket(self):
-        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_s:
-            udp_s.setblocking(0)
-            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_s:
-                tcp_s.connect(('8.8.8.8', 53))
-                tcp_s.setblocking(0)
-                qname = dns.name.from_text('.')
-                q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
-                (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8',
-                                                      udp_sock=udp_s,
-                                                      tcp_sock=tcp_s)
-                self.assertTrue(tcp)
+        for address in query_addresses:
+            af = dns.inet.af_for_address(address)
+            with socket.socket(af, socket.SOCK_DGRAM) as udp_s:
+                udp_s.setblocking(0)
+                with socket.socket(af, socket.SOCK_STREAM) as tcp_s:
+                    ll = dns.inet.low_level_address_tuple((address, 53))
+                    tcp_s.connect(ll)
+                    tcp_s.setblocking(0)
+                    qname = dns.name.from_text('.')
+                    q = dns.message.make_query(qname, dns.rdatatype.DNSKEY)
+                    (_, tcp) = dns.query.udp_with_fallback(q, address,
+                                                          udp_sock=udp_s,
+                                                          tcp_sock=tcp_s)
+                    self.assertTrue(tcp)
 
     def testQueryUDPFallbackNoFallback(self):
-        qname = dns.name.from_text('dns.google.')
-        q = dns.message.make_query(qname, dns.rdatatype.A)
-        (_, tcp) = dns.query.udp_with_fallback(q, '8.8.8.8')
-        self.assertFalse(tcp)
+        for address in query_addresses:
+            qname = dns.name.from_text('dns.google.')
+            q = dns.message.make_query(qname, dns.rdatatype.A)
+            (_, tcp) = dns.query.udp_with_fallback(q, address)
+            self.assertFalse(tcp)
 
 
 axfr_zone = '''