]> git.ipfire.org Git - thirdparty/dnspython.git/commitdiff
Test sending a query with TSIG.
authorBrian Wellington <bwelling@xbill.org>
Tue, 16 Jun 2020 20:42:31 +0000 (13:42 -0700)
committerBrian Wellington <bwelling@xbill.org>
Tue, 16 Jun 2020 20:43:16 +0000 (13:43 -0700)
tests/test_query.py

index df52dfe5a4fee3058769ce4b72a9e30c81d49b6a..ec490f1cfdc2773a27d230b86aa297f62cf3113d 100644 (file)
@@ -31,6 +31,7 @@ import dns.name
 import dns.rdataclass
 import dns.rdatatype
 import dns.query
+import dns.tsigkeyring
 import dns.zone
 
 # Some tests require the internet to be available to run, so let's
@@ -65,6 +66,8 @@ for (af, address) in ((socket.AF_INET, '8.8.8.8'),
     except Exception:
         pass
 
+keyring = dns.tsigkeyring.from_text({'name' : 'tDz6cfXXGtNivRpQ98hr6A=='})
+
 @unittest.skipIf(not _network_available, "Internet not reachable")
 class QueryTests(unittest.TestCase):
 
@@ -392,3 +395,39 @@ class XfrTests(unittest.TestCase):
                                     relativize=False)
                 l = list(xfr)
         self.assertRaises(dns.exception.FormError, bad)
+
+class TSIGNanoNameserver(Server):
+
+    def handle(self, message, peer, connection_type):
+        response = dns.message.make_response(message)
+        response.set_rcode(dns.rcode.REFUSED)
+        response.flags |= dns.flags.RA
+        try:
+            if message.question[0].rdtype == dns.rdatatype.A and \
+               message.question[0].rdclass == dns.rdataclass.IN:
+                rrs = dns.rrset.from_text(message.question[0].name, 300,
+                                          'IN', 'A', '1.2.3.4')
+                response.answer.append(rrs)
+                response.set_rcode(dns.rcode.NOERROR)
+                response.flags |= dns.flags.AA
+        except Exception:
+            pass
+        return response
+
+@unittest.skipIf(not _nanonameserver_available,
+                 "Internet and nanonameserver required")
+class TsigTests(unittest.TestCase):
+
+    def test_tsig(self):
+        with TSIGNanoNameserver(keyring=keyring) as ns:
+            qname = dns.name.from_text('example.com')
+            q = dns.message.make_query(qname, 'A')
+            q.use_tsig(keyring=keyring, keyname='name')
+            response = dns.query.udp(q, ns.udp_address[0],
+                                     port=ns.udp_address[1])
+            self.assertTrue(response.had_tsig)
+            rrs = response.get_rrset(response.answer, qname,
+                                     dns.rdataclass.IN, dns.rdatatype.A)
+            self.assertTrue(rrs is not None)
+            seen = set([rdata.address for rdata in rrs])
+            self.assertTrue('1.2.3.4' in seen)