]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Improve network checking [#812].
authorBob Halley <halley@dnspython.org>
Sat, 4 Jun 2022 21:03:56 +0000 (14:03 -0700)
committerBob Halley <halley@dnspython.org>
Sat, 4 Jun 2022 21:03:56 +0000 (14:03 -0700)
tests/test_doh.py
tests/test_query.py
tests/test_resolver.py
tests/util.py

index c4d6476cd1ac8cdeb0df09f93f73aa2bdb10c905..3626bf37df726db45b61577071929c73bfa64952 100644 (file)
@@ -40,31 +40,20 @@ if dns.query._have_httpx:
 
 import tests.util
 
-# Probe for IPv4 and IPv6
 resolver_v4_addresses = []
 resolver_v6_addresses = []
-try:
-    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
-        s.settimeout(4)
-        s.connect(("8.8.8.8", 53))
+if tests.util.have_ipv4():
     resolver_v4_addresses = [
         "1.1.1.1",
         "8.8.8.8",
         # '9.9.9.9',
     ]
-except Exception:
-    pass
-try:
-    with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) as s:
-        s.connect(("2001:4860:4860::8888", 53))
+if tests.util.have_ipv6():
     resolver_v6_addresses = [
         "2606:4700:4700::1111",
-        # Google says 404
-        # '2001:4860:4860::8888',
+        "2001:4860:4860::8888",
         # '2620:fe::fe',
     ]
-except Exception:
-    pass
 
 KNOWN_ANYCAST_DOH_RESOLVER_URLS = [
     "https://cloudflare-dns.com/dns-query",
index ed2f112dfff082ba9b95344e5a40895f56ddadf4..4586e32169997d9073f383024ab1e2fd41e7254c 100644 (file)
@@ -52,20 +52,11 @@ except ImportError:
         pass
 
 
-# Probe for IPv4 and IPv6
 query_addresses = []
-for (af, address) in (
-    (socket.AF_INET, "8.8.8.8"),
-    (socket.AF_INET6, "2001:4860:4860::8888"),
-):
-    try:
-        with socket.socket(af, socket.SOCK_DGRAM) as s:
-            # Connecting a UDP socket is supposed to return ENETUNREACH if
-            # no route to the network is present.
-            s.connect((address, 53))
-        query_addresses.append(address)
-    except Exception:
-        pass
+if tests.util.have_ipv4():
+    query_addresses.append("8.8.8.8")
+if tests.util.have_ipv6():
+    query_addresses.append("2001:4860:4860::8888")
 
 keyring = dns.tsigkeyring.from_text({"name": "tDz6cfXXGtNivRpQ98hr6A=="})
 
index 9b8ea3d0285007f110466bce73598aa960e14492..d21127d79d48f4ca8c6d120b57ee381b04c93d53 100644 (file)
@@ -705,6 +705,7 @@ class LiveResolverTests(unittest.TestCase):
             self.assertIn(qname, nx.qnames())
             self.assertGreaterEqual(len(nx.responses()), 1)
 
+    @unittest.skipIf(not tests.util.have_ipv4(), "IPv4 not reachable")
     def testResolveCacheHit(self):
         res = dns.resolver.Resolver(configure=False)
         res.nameservers = ["8.8.8.8"]
index ffce722d32cdca75c701f1d03d586cc53a12a004..bf417e4cae3c78b1956423617543ea77f93e3109 100644 (file)
@@ -20,33 +20,93 @@ import inspect
 import os
 import socket
 
+import dns.message
+import dns.name
+import dns.query
+import dns.rdataclass
+import dns.rdatatype
+
 # Cache for is_internet_reachable()
 _internet_reachable = None
+_have_ipv4 = False
+_have_ipv6 = False
 
 
 def here(filename):
     return os.path.join(os.path.dirname(__file__), filename)
 
 
+def check_networking(addresses):
+    """Can we do a DNS resolution via UDP and TCP to at least one of the addresses?"""
+    for address in addresses:
+        try:
+            q = dns.message.make_query(dns.name.root, dns.rdatatype.NS)
+            ok = False
+            # We try UDP a few times in case we get unlucky and a packet is lost.
+            for i in range(5):
+                # We don't check the answer other than make sure there is one.
+                try:
+                    r = dns.query.udp(q, address, timeout=4)
+                    ns = r.find_rrset(
+                        r.answer, dns.name.root, dns.rdataclass.IN, dns.rdatatype.NS
+                    )
+                    ok = True
+                    break
+                except Exception:
+                    continue  # UDP try loop
+            if not ok:
+                continue  # addresses loop
+            try:
+                r = dns.query.tcp(q, address, timeout=4)
+                ns = r.find_rrset(
+                    r.answer, dns.name.root, dns.rdataclass.IN, dns.rdatatype.NS
+                )
+                # UDP and TCP both work!
+                return True
+            except Exception:
+                continue
+        except Exception as e:
+            pass
+    return False
+
+
 def is_internet_reachable():
     """Check if the Internet is reachable.
 
     Setting the environment variable `NO_INTERNET` will let this
     function always return False. The result is cached.
+
+    We check using the Google and Cloudflare public resolvers as they are highly
+    available and have well-known stable addresses.
     """
     global _internet_reachable
     if _internet_reachable is None:
         if os.environ.get("NO_INTERNET"):
             _internet_reachable = False
         else:
-            try:
-                socket.gethostbyname("dnspython.org")
-                _internet_reachable = True
-            except socket.gaierror:
-                _internet_reachable = False
+            global _have_ipv4
+            _have_ipv4 = check_networking(["8.8.8.8", "1.1.1.1"])
+            global _have_ipv6
+            _have_ipv6 = check_networking(
+                ["2001:4860:4860::8888", "2606:4700:4700::1111"]
+            )
+            print(_have_ipv4 or _have_ipv6)
+            _internet_reachable = _have_ipv4 or _have_ipv6
     return _internet_reachable
 
 
+def have_ipv4():
+    if not is_internet_reachable():
+        return False
+    return _have_ipv4
+
+
+def have_ipv6():
+    if not is_internet_reachable():
+        return False
+    return _have_ipv6
+
+
 def enumerate_module(module, super_class):
     """Yield module attributes which are subclasses of given class"""
     for attr_name in dir(module):