]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
If a negative response has an SOA in the authority section, then 673/head
authorBob Halley <halley@dnspython.org>
Sun, 11 Jul 2021 21:23:48 +0000 (14:23 -0700)
committerBob Halley <halley@dnspython.org>
Sun, 11 Jul 2021 21:23:48 +0000 (14:23 -0700)
zone_for_name() will now use it to make the search more efficient.

zone_for_name() now has an optional lifetime parameter which limits the
total time that can be spent resolving.

dns/resolver.py
tests/test_resolver.py

index 10b6ca5a176f489871cfa8ca5de64af4fa251a05..6a9974d830ca1be274997fb5ba774b4f34421bf6 100644 (file)
@@ -170,6 +170,9 @@ class NoAnswer(dns.exception.DNSException):
     def _fmt_kwargs(self, **kwargs):
         return super()._fmt_kwargs(query=kwargs['response'].question)
 
+    def response(self):
+        return self.kwargs['response']
+
 
 class NoNameservers(dns.exception.DNSException):
     """All nameservers failed to answer the query.
@@ -1368,7 +1371,8 @@ def canonical_name(name):
     return get_default_resolver().canonical_name(name)
 
 
-def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
+def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None,
+                  lifetime=None):
     """Find the name of the zone which contains the specified name.
 
     *name*, an absolute ``dns.name.Name`` or ``str``, the query name.
@@ -1378,12 +1382,19 @@ def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
     *tcp*, a ``bool``.  If ``True``, use TCP to make the query.
 
     *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use.
-    If ``None``, the default resolver is used.
+    If ``None``, the default, then the default resolver is used.
+
+    *lifetime*, a ``float``, the total time to allow for the queries needed
+    to determine the zone.  If ``None``, the default, then only the individual
+    query limits of the resolver apply.
 
     Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS
     root.  (This is only likely to happen if you're using non-default
     root servers in your network and they are misconfigured.)
 
+    Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be
+    found in the alotted lifetime.
+
     Returns a ``dns.name.Name``.
     """
 
@@ -1393,14 +1404,44 @@ def zone_for_name(name, rdclass=dns.rdataclass.IN, tcp=False, resolver=None):
         resolver = get_default_resolver()
     if not name.is_absolute():
         raise NotAbsolute(name)
+    start = time.time()
+    if lifetime is not None:
+        expiration = start + lifetime
+    else:
+        expiration = None
     while 1:
         try:
-            answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp)
+            if expiration:
+                rlifetime = expiration - time.time()
+                if rlifetime <= 0:
+                    rlifetime = 0
+            else:
+                rlifetime = None
+            answer = resolver.resolve(name, dns.rdatatype.SOA, rdclass, tcp,
+                                      lifetime=rlifetime)
             if answer.rrset.name == name:
                 return name
             # otherwise we were CNAMEd or DNAMEd and need to look higher
-        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
-            pass
+        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
+            if isinstance(e, dns.resolver.NXDOMAIN):
+                response = e.responses().get(name)
+            else:
+                response = e.response()
+            if response:
+                for rrs in response.authority:
+                    if rrs.rdtype == dns.rdatatype.SOA and \
+                       rrs.rdclass == rdclass:
+                       (nr, _, _) = rrs.name.fullcompare(name)
+                       if nr == dns.name.NAMERELN_SUPERDOMAIN:
+                           # We're doing a proper superdomain check as
+                           # if the name were equal we ought to have gotten
+                           # it in the answer section!  We are ignoring the
+                           # possibility that the authority is insane and
+                           # is including multiple SOA RRs for different
+                           # authorities.
+                           return rrs.name
+            # we couldn't extract anything useful from the response (e.g. it's
+            # a type 3 NXDOMAIN)
         try:
             name = name.parent()
         except dns.name.NoParent:
index d15148500517e65ceb6eb09c2082a45eb8bd3a3d..b2a47d239bc5d3a7793e3b012dc7b8e324290730 100644 (file)
@@ -947,9 +947,41 @@ class AlwaysType3NXDOMAINNanoNameserver(Server):
         response.flags |= dns.flags.RA
         return response
 
+
+class AlwaysNXDOMAINNanoNameserver(Server):
+
+    def handle(self, request):
+        response = dns.message.make_response(request.message)
+        response.set_rcode(dns.rcode.NXDOMAIN)
+        response.flags |= dns.flags.RA
+        origin = dns.name.from_text('example.')
+        soa_rrset = response.find_rrset(response.authority, origin,
+                                        dns.rdataclass.IN, dns.rdatatype.SOA,
+                                        create=True)
+        rdata = dns.rdata.from_text('IN', 'SOA',
+                                    'ns.example. root.example. 1 2 3 4 5')
+        soa_rrset.add(rdata)
+        soa_rrset.update_ttl(300)
+        return response
+
+class AlwaysNoErrorNoDataNanoNameserver(Server):
+
+    def handle(self, request):
+        response = dns.message.make_response(request.message)
+        response.set_rcode(dns.rcode.NOERROR)
+        response.flags |= dns.flags.RA
+        origin = dns.name.from_text('example.')
+        soa_rrset = response.find_rrset(response.authority, origin,
+                                        dns.rdataclass.IN, dns.rdatatype.SOA,
+                                        create=True)
+        rdata = dns.rdata.from_text('IN', 'SOA',
+                                    'ns.example. root.example. 1 2 3 4 5')
+        soa_rrset.add(rdata)
+        soa_rrset.update_ttl(300)
+        return response
 @unittest.skipIf(not (_network_available and _nanonameserver_available),
                  "Internet and NanoAuth required")
-class ZoneForNameNoParentTest(unittest.TestCase):
+class ZoneForNameTests(unittest.TestCase):
 
     def testNoRootSOA(self):
         with AlwaysType3NXDOMAINNanoNameserver() as na:
@@ -959,6 +991,25 @@ class ZoneForNameNoParentTest(unittest.TestCase):
             with self.assertRaises(dns.resolver.NoRootSOA):
                 dns.resolver.zone_for_name('www.foo.bar.', resolver=res)
 
+    def testHelpfulNXDOMAIN(self):
+        with AlwaysNXDOMAINNanoNameserver() as na:
+            res = dns.resolver.Resolver(configure=False)
+            res.port = na.udp_address[1]
+            res.nameservers = [na.udp_address[0]]
+            expected = dns.name.from_text('example.')
+            name = dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.',
+                                              resolver=res)
+            self.assertEqual(name, expected)
+
+    def testHelpfulNoErrorNoData(self):
+        with AlwaysNoErrorNoDataNanoNameserver() as na:
+            res = dns.resolver.Resolver(configure=False)
+            res.port = na.udp_address[1]
+            res.nameservers = [na.udp_address[0]]
+            expected = dns.name.from_text('example.')
+            name = dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.',
+                                              resolver=res)
+            self.assertEqual(name, expected)
 
 class DroppingNanoNameserver(Server):
 
@@ -1019,3 +1070,25 @@ def testResolverNoNameservers():
                 assert not error[1]  # not TCP
                 assert error[2] == na.udp_address[1]  # port
                 assert error[3] == 'FORMERR'
+
+
+class SlowAlwaysType3NXDOMAINNanoNameserver(Server):
+
+    def handle(self, request):
+        response = dns.message.make_response(request.message)
+        response.set_rcode(dns.rcode.NXDOMAIN)
+        response.flags |= dns.flags.RA
+        time.sleep(0.2)
+        return response
+
+
+@pytest.mark.skipif(not (_network_available and _nanonameserver_available),
+                    reason="Internet and NanoAuth required")
+def testZoneForNameLifetimeTimeout():
+    with SlowAlwaysType3NXDOMAINNanoNameserver() as na:
+        res = dns.resolver.Resolver(configure=False)
+        res.port = na.udp_address[1]
+        res.nameservers = [na.udp_address[0]]
+        with pytest.raises(dns.resolver.LifetimeTimeout):
+            dns.resolver.zone_for_name('1.2.3.4.5.6.7.8.9.10.example.',
+                resolver=res, lifetime=1.0)