import dns.rdataclass
import dns.rdatatype
import dns.query
+import dns.tsigkeyring
import dns.zone
# Some tests require the internet to be available to run, so let's
except Exception:
pass
+keyring = dns.tsigkeyring.from_text({'name' : 'tDz6cfXXGtNivRpQ98hr6A=='})
+
@unittest.skipIf(not _network_available, "Internet not reachable")
class QueryTests(unittest.TestCase):
relativize=False)
l = list(xfr)
self.assertRaises(dns.exception.FormError, bad)
+
+class TSIGNanoNameserver(Server):
+
+ def handle(self, message, peer, connection_type):
+ response = dns.message.make_response(message)
+ response.set_rcode(dns.rcode.REFUSED)
+ response.flags |= dns.flags.RA
+ try:
+ if message.question[0].rdtype == dns.rdatatype.A and \
+ message.question[0].rdclass == dns.rdataclass.IN:
+ rrs = dns.rrset.from_text(message.question[0].name, 300,
+ 'IN', 'A', '1.2.3.4')
+ response.answer.append(rrs)
+ response.set_rcode(dns.rcode.NOERROR)
+ response.flags |= dns.flags.AA
+ except Exception:
+ pass
+ return response
+
+@unittest.skipIf(not _nanonameserver_available,
+ "Internet and nanonameserver required")
+class TsigTests(unittest.TestCase):
+
+ def test_tsig(self):
+ with TSIGNanoNameserver(keyring=keyring) as ns:
+ qname = dns.name.from_text('example.com')
+ q = dns.message.make_query(qname, 'A')
+ q.use_tsig(keyring=keyring, keyname='name')
+ response = dns.query.udp(q, ns.udp_address[0],
+ port=ns.udp_address[1])
+ self.assertTrue(response.had_tsig)
+ rrs = response.get_rrset(response.answer, qname,
+ dns.rdataclass.IN, dns.rdatatype.A)
+ self.assertTrue(rrs is not None)
+ seen = set([rdata.address for rdata in rrs])
+ self.assertTrue('1.2.3.4' in seen)