From: Brian Wellington Date: Tue, 16 Jun 2020 20:42:31 +0000 (-0700) Subject: Test sending a query with TSIG. X-Git-Tag: v2.0.0rc1~84 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e3c7fa8b24b0f7c2d5889f49dc4b72b749d331d3;p=thirdparty%2Fdnspython.git Test sending a query with TSIG. --- diff --git a/tests/test_query.py b/tests/test_query.py index df52dfe5..ec490f1c 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -31,6 +31,7 @@ import dns.name import dns.rdataclass import dns.rdatatype import dns.query +import dns.tsigkeyring import dns.zone # Some tests require the internet to be available to run, so let's @@ -65,6 +66,8 @@ for (af, address) in ((socket.AF_INET, '8.8.8.8'), except Exception: pass +keyring = dns.tsigkeyring.from_text({'name' : 'tDz6cfXXGtNivRpQ98hr6A=='}) + @unittest.skipIf(not _network_available, "Internet not reachable") class QueryTests(unittest.TestCase): @@ -392,3 +395,39 @@ class XfrTests(unittest.TestCase): relativize=False) l = list(xfr) self.assertRaises(dns.exception.FormError, bad) + +class TSIGNanoNameserver(Server): + + def handle(self, message, peer, connection_type): + response = dns.message.make_response(message) + response.set_rcode(dns.rcode.REFUSED) + response.flags |= dns.flags.RA + try: + if message.question[0].rdtype == dns.rdatatype.A and \ + message.question[0].rdclass == dns.rdataclass.IN: + rrs = dns.rrset.from_text(message.question[0].name, 300, + 'IN', 'A', '1.2.3.4') + response.answer.append(rrs) + response.set_rcode(dns.rcode.NOERROR) + response.flags |= dns.flags.AA + except Exception: + pass + return response + +@unittest.skipIf(not _nanonameserver_available, + "Internet and nanonameserver required") +class TsigTests(unittest.TestCase): + + def test_tsig(self): + with TSIGNanoNameserver(keyring=keyring) as ns: + qname = dns.name.from_text('example.com') + q = dns.message.make_query(qname, 'A') + q.use_tsig(keyring=keyring, keyname='name') + response = dns.query.udp(q, ns.udp_address[0], + port=ns.udp_address[1]) + self.assertTrue(response.had_tsig) + rrs = response.get_rrset(response.answer, qname, + dns.rdataclass.IN, dns.rdatatype.A) + self.assertTrue(rrs is not None) + seen = set([rdata.address for rdata in rrs]) + self.assertTrue('1.2.3.4' in seen)