]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Regression test for locked cache
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 28 Sep 2022 13:14:00 +0000 (15:14 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 28 Sep 2022 13:14:00 +0000 (15:14 +0200)
Fixes #11984

regression-tests.recursor-dnssec/recursortests.py
regression-tests.recursor-dnssec/test_LockedCache.py [new file with mode: 0644]

index ecc8f9485cbde5ebcdad206f271034ea7ebb9dfd..2576b0955b5802a2214ab22928b6c365983ad687 100644 (file)
@@ -163,6 +163,8 @@ secure.example.          3600 IN NS   ns.secure.example.
 ns.secure.example.       3600 IN A    {prefix}.9
 secure.example.          3600 IN MX   10 mx1.secure.example.
 secure.example.          3600 IN MX   20 mx2.secure.example.
+sub.secure.example.      3600 IN MX   10 mx1.secure.example.
+sub.secure.example.      3600 IN MX   20 mx2.secure.example.
 
 naptr.secure.example.    60   IN NAPTR   10 10 "a" "X" "A" s1.secure.example.
 naptr.secure.example.    60   IN NAPTR   10 10 "s" "Y" "B" service1.secure.example.
diff --git a/regression-tests.recursor-dnssec/test_LockedCache.py b/regression-tests.recursor-dnssec/test_LockedCache.py
new file mode 100644 (file)
index 0000000..26c955a
--- /dev/null
@@ -0,0 +1,105 @@
+import dns
+import os
+import subprocess
+import time
+
+from recursortests import RecursorTest
+
+class testLockedCache(RecursorTest):
+    """
+    Test that a locked cached entry is *not* updated by the same additional encountered in a second query
+    """
+    _confdir = 'LockedCache'
+
+    _config_template = """
+    dnssec=validate
+    record-cache-locked-ttl-perc=100
+    """
+
+    def getCacheTTL(self):
+        rec_controlCmd = [os.environ['RECCONTROL'],
+                          '--config-dir=%s' % 'configs/' + self._confdir,
+                          'dump-cache',
+                          '-']
+        try:
+            ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT, text=True)
+            for i in ret.splitlines():
+                pieces = i.split(' ')
+                print(pieces)
+                if pieces[0] == 'mx1.secure.example.' and pieces[4] == 'A':
+                    return pieces[2]
+            raise AssertionError("Cache Line not found");
+
+        except subprocess.CalledProcessError as e:
+            print(e.output)
+            raise
+
+    def testMX(self):
+        expected1 = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+        expected2 = dns.rrset.from_text('sub.secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+        query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
+        query1.flags |= dns.flags.AD
+        query2 = dns.message.make_query('sub.secure.example', 'MX', want_dnssec=True)
+        query2.flags |= dns.flags.AD
+
+        res = self.sendUDPQuery(query1)
+        self.assertMessageIsAuthenticated(res)
+        self.assertRRsetInAnswer(res, expected1)
+        self.assertMatchingRRSIGInAnswer(res, expected1)
+        ttl1 = self.getCacheTTL()
+        time.sleep(2)
+        res = self.sendUDPQuery(query2)
+        self.assertMessageIsAuthenticated(res)
+        self.assertRRsetInAnswer(res, expected2)
+        self.assertMatchingRRSIGInAnswer(res, expected2)
+        ttl2 = self.getCacheTTL()
+        self.assertNotEqual(ttl1, ttl2)
+
+class testNotLockedCache(RecursorTest):
+    """
+    Test that a not locked cached entry *is* updated by the same additional encountered in a second query
+    """
+    _confdir = 'NotLockedCache'
+
+    _config_template = """
+    dnssec=validate
+    """
+
+    def getCacheTTL(self):
+        rec_controlCmd = [os.environ['RECCONTROL'],
+                          '--config-dir=%s' % 'configs/' + self._confdir,
+                          'dump-cache',
+                          '-']
+        try:
+            ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT, text=True)
+            for i in ret.splitlines():
+                pieces = i.split(' ')
+                print(pieces)
+                if pieces[0] == 'mx1.secure.example.' and pieces[4] == 'A':
+                    return pieces[2]
+            return -1
+
+        except subprocess.CalledProcessError as e:
+            print(e.output)
+            raise
+
+    def testMX(self):
+        expected1 = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+        expected2 = dns.rrset.from_text('sub.secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
+        query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
+        query1.flags |= dns.flags.AD
+        query2 = dns.message.make_query('sub.secure.example', 'MX', want_dnssec=True)
+        query2.flags |= dns.flags.AD
+
+        res = self.sendUDPQuery(query1)
+        self.assertMessageIsAuthenticated(res)
+        self.assertRRsetInAnswer(res, expected1)
+        self.assertMatchingRRSIGInAnswer(res, expected1)
+        ttl1 = self.getCacheTTL()
+        time.sleep(2)
+        res = self.sendUDPQuery(query2)
+        self.assertMessageIsAuthenticated(res)
+        self.assertRRsetInAnswer(res, expected2)
+        self.assertMatchingRRSIGInAnswer(res, expected2)
+        ttl2 = self.getCacheTTL()
+        self.assertEqual(ttl1, ttl2)