import dns
import os
import requests
+import subprocess
from recursortests import RecursorTest
_wsPassword = 'secretpassword'
_apiKey = 'secretapikey'
_config_template = """
- packetcache-ttl=60
+ packetcache-ttl=10
+ packetcache-negative-ttl=8
+ packetcache-servfail-ttl=5
auth-zones=example=configs/%s/example.zone
webserver=yes
webserver-port=%d
c 3600 IN A 192.0.2.42
d 3600 IN A 192.0.2.42
e 3600 IN A 192.0.2.42
+f 3600 IN CNAME f ; CNAME loop: dirty trick to get a ServFail in an authzone
""".format(soa=cls._SOA))
super(PacketCacheRecursorTest, cls).generateRecursorConfig(confdir)
def checkPacketCacheMetrics(self, expectedHits, expectedMisses):
+ self.waitForTCPSocket("127.0.0.1", self._wsPort)
headers = {'x-api-key': self._apiKey}
url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
r = requests.get(url, headers=headers, timeout=self._wsTimeout)
self.assertTrue(r)
- self.assertEquals(r.status_code, 200)
+ self.assertEqual(r.status_code, 200)
self.assertTrue(r.json())
content = r.json()
foundHits = False
for entry in content:
if entry['name'] == 'packetcache-hits':
foundHits = True
- self.assertEquals(int(entry['value']), expectedHits)
+ self.assertEqual(int(entry['value']), expectedHits)
elif entry['name'] == 'packetcache-misses':
foundMisses = True
- self.assertEquals(int(entry['value']), expectedMisses)
+ self.assertEqual(int(entry['value']), expectedMisses)
self.assertTrue(foundHits)
self.assertTrue(foundMisses)
def testPacketCache(self):
+ self.waitForTCPSocket("127.0.0.1", self._wsPort)
# first query, no cookie
qname = 'a.example.'
query = dns.message.make_query(qname, 'A', want_dnssec=True)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertRRsetInAnswer(res, expected)
self.checkPacketCacheMetrics(6, 4)
+
+ # NXDomain should get negative packetcache TTL (8)
+ query = dns.message.make_query('nxdomain.example.', 'A', want_dnssec=True)
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+ self.checkPacketCacheMetrics(6, 5)
+
+ # NoData should get negative packetcache TTL (8)
+ query = dns.message.make_query('a.example.', 'AAAA', want_dnssec=True)
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.checkPacketCacheMetrics(6, 6)
+
+ # ServFail should get ServFail TTL (5)
+ query = dns.message.make_query('f.example.', 'A', want_dnssec=True)
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.checkPacketCacheMetrics(6, 7)
+
+ # We peek into the cache to check TTLs and allow TTLs to be one lower than inserted since the clock might have ticked
+ rec_controlCmd = [os.environ['RECCONTROL'],
+ '--config-dir=%s' % 'configs/' + self._confdir,
+ 'dump-cache', '-']
+ try:
+ ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT)
+ self.assertTrue((b"a.example. 10 A ; tag 0 udp\n" in ret) or (b"a.example. 9 A ; tag 0 udp\n" in ret))
+ self.assertTrue((b"nxdomain.example. 8 A ; tag 0 udp\n" in ret) or (b"nxdomain.example. 7 A ; tag 0 udp\n" in ret))
+ self.assertTrue((b"a.example. 8 AAAA ; tag 0 udp\n" in ret) or (b"a.example. 7 AAAA ; tag 0 udp\n" in ret))
+ self.assertTrue((b"f.example. 5 A ; tag 0 udp\n" in ret) or (b"f.example. 4 A ; tag 0 udp\n" in ret))
+
+ except subprocess.CalledProcessError as e:
+ print(e.output)
+ raise
+