]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.recursor-dnssec/test_PacketCache.py
rec: dnspython's API changed wrt NSID, apply (version dependent) fix in regression...
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_PacketCache.py
index dfcd8bc115973e6dc3e690fffb09464f2463973e..ba8c102ddde4b97e407817d65d22cdcc976817c4 100644 (file)
@@ -3,6 +3,7 @@ import cookiesoption
 import dns
 import os
 import requests
+import subprocess
 
 from recursortests import RecursorTest
 
@@ -19,7 +20,9 @@ class PacketCacheRecursorTest(RecursorTest):
     _wsPassword = 'secretpassword'
     _apiKey = 'secretapikey'
     _config_template = """
-    packetcache-ttl=60
+    packetcache-ttl=10
+    packetcache-negative-ttl=8
+    packetcache-servfail-ttl=5
     auth-zones=example=configs/%s/example.zone
     webserver=yes
     webserver-port=%d
@@ -39,15 +42,17 @@ b 3600 IN A 192.0.2.42
 c 3600 IN A 192.0.2.42
 d 3600 IN A 192.0.2.42
 e 3600 IN A 192.0.2.42
+f 3600 IN CNAME f            ; CNAME loop: dirty trick to get a ServFail in an authzone
 """.format(soa=cls._SOA))
         super(PacketCacheRecursorTest, cls).generateRecursorConfig(confdir)
 
     def checkPacketCacheMetrics(self, expectedHits, expectedMisses):
+        self.waitForTCPSocket("127.0.0.1", self._wsPort)
         headers = {'x-api-key': self._apiKey}
         url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
         r = requests.get(url, headers=headers, timeout=self._wsTimeout)
         self.assertTrue(r)
-        self.assertEquals(r.status_code, 200)
+        self.assertEqual(r.status_code, 200)
         self.assertTrue(r.json())
         content = r.json()
         foundHits = False
@@ -55,15 +60,16 @@ e 3600 IN A 192.0.2.42
         for entry in content:
             if entry['name'] == 'packetcache-hits':
                 foundHits = True
-                self.assertEquals(int(entry['value']), expectedHits)
+                self.assertEqual(int(entry['value']), expectedHits)
             elif entry['name'] == 'packetcache-misses':
                 foundMisses = True
-                self.assertEquals(int(entry['value']), expectedMisses)
+                self.assertEqual(int(entry['value']), expectedMisses)
 
         self.assertTrue(foundHits)
         self.assertTrue(foundMisses)
 
     def testPacketCache(self):
+        self.waitForTCPSocket("127.0.0.1", self._wsPort)
         # first query, no cookie
         qname = 'a.example.'
         query = dns.message.make_query(qname, 'A', want_dnssec=True)
@@ -136,3 +142,37 @@ e 3600 IN A 192.0.2.42
         self.assertRcodeEqual(res, dns.rcode.NOERROR)
         self.assertRRsetInAnswer(res, expected)
         self.checkPacketCacheMetrics(6, 4)
+
+        # NXDomain should get negative packetcache TTL (8)
+        query = dns.message.make_query('nxdomain.example.', 'A', want_dnssec=True)
+        res = self.sendUDPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
+        self.checkPacketCacheMetrics(6, 5)
+
+        # NoData should get negative packetcache TTL (8)
+        query = dns.message.make_query('a.example.', 'AAAA', want_dnssec=True)
+        res = self.sendUDPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.checkPacketCacheMetrics(6, 6)
+
+        # ServFail should get ServFail TTL (5)
+        query = dns.message.make_query('f.example.', 'A', want_dnssec=True)
+        res = self.sendUDPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+        self.checkPacketCacheMetrics(6, 7)
+
+        # We peek into the cache to check TTLs and allow TTLs to be one lower than inserted since the clock might have ticked
+        rec_controlCmd = [os.environ['RECCONTROL'],
+                          '--config-dir=%s' % 'configs/' + self._confdir,
+                          'dump-cache', '-']
+        try:
+            ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT)
+            self.assertTrue((b"a.example. 10 A  ; tag 0 udp\n" in ret) or (b"a.example. 9 A  ; tag 0 udp\n" in ret))
+            self.assertTrue((b"nxdomain.example. 8 A  ; tag 0 udp\n" in ret) or (b"nxdomain.example. 7 A  ; tag 0 udp\n" in ret))
+            self.assertTrue((b"a.example. 8 AAAA  ; tag 0 udp\n" in ret) or (b"a.example. 7 AAAA  ; tag 0 udp\n" in ret))
+            self.assertTrue((b"f.example. 5 A  ; tag 0 udp\n" in ret) or (b"f.example. 4 A  ; tag 0 udp\n" in ret))
+
+        except subprocess.CalledProcessError as e:
+            print(e.output)
+            raise
+