]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_LockedCache.py
Merge pull request #13459 from ukleinek/manpage-link-fix
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_LockedCache.py
1 import dns
2 import os
3 import subprocess
4 import time
5
6 from recursortests import RecursorTest
7
8 class testLockedCache(RecursorTest):
9 """
10 Test that a locked cached entry is *not* updated by the same additional encountered in a second query
11 """
12 _confdir = 'LockedCache'
13
14 _config_template = """
15 dnssec=validate
16 record-cache-locked-ttl-perc=100
17 """
18
19 def getCacheTTL(self):
20 rec_controlCmd = [os.environ['RECCONTROL'],
21 '--config-dir=%s' % 'configs/' + self._confdir,
22 'dump-cache',
23 '-']
24 try:
25 ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT, text=True)
26 for i in ret.splitlines():
27 pieces = i.split(' ')
28 print(pieces)
29 if pieces[0] == 'mx1.secure.example.' and pieces[4] == 'A':
30 return pieces[2]
31 raise AssertionError("Cache Line not found");
32
33 except subprocess.CalledProcessError as e:
34 print(e.output)
35 raise
36
37 def testMX(self):
38 expected1 = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
39 expected2 = dns.rrset.from_text('sub.secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
40 query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
41 query1.flags |= dns.flags.AD
42 query2 = dns.message.make_query('sub.secure.example', 'MX', want_dnssec=True)
43 query2.flags |= dns.flags.AD
44
45 res = self.sendUDPQuery(query1)
46 self.assertMessageIsAuthenticated(res)
47 self.assertRRsetInAnswer(res, expected1)
48 self.assertMatchingRRSIGInAnswer(res, expected1)
49 ttl1 = self.getCacheTTL()
50 time.sleep(2)
51 res = self.sendUDPQuery(query2)
52 self.assertMessageIsAuthenticated(res)
53 self.assertRRsetInAnswer(res, expected2)
54 self.assertMatchingRRSIGInAnswer(res, expected2)
55 ttl2 = self.getCacheTTL()
56 self.assertGreater(ttl1, ttl2)
57
58 class testNotLockedCache(RecursorTest):
59 """
60 Test that a not locked cached entry *is* updated by the same additional encountered in a second query
61 """
62 _confdir = 'NotLockedCache'
63
64 _config_template = """
65 dnssec=validate
66 """
67
68 def getCacheTTL(self):
69 rec_controlCmd = [os.environ['RECCONTROL'],
70 '--config-dir=%s' % 'configs/' + self._confdir,
71 'dump-cache',
72 '-']
73 try:
74 ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT, text=True)
75 for i in ret.splitlines():
76 pieces = i.split(' ')
77 print(pieces)
78 if pieces[0] == 'mx1.secure.example.' and pieces[4] == 'A':
79 return int(pieces[2])
80 return -1
81
82 except subprocess.CalledProcessError as e:
83 print(e.output)
84 raise
85
86 def testMX(self):
87 expected1 = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
88 expected2 = dns.rrset.from_text('sub.secure.example.', 0, dns.rdataclass.IN, 'MX', '10 mx1.secure.example.', '20 mx2.secure.example.')
89 query1 = dns.message.make_query('secure.example', 'MX', want_dnssec=True)
90 query1.flags |= dns.flags.AD
91 query2 = dns.message.make_query('sub.secure.example', 'MX', want_dnssec=True)
92 query2.flags |= dns.flags.AD
93
94 res = self.sendUDPQuery(query1)
95 self.assertMessageIsAuthenticated(res)
96 self.assertRRsetInAnswer(res, expected1)
97 self.assertMatchingRRSIGInAnswer(res, expected1)
98 ttl1 = self.getCacheTTL()
99 time.sleep(2)
100 res = self.sendUDPQuery(query2)
101 self.assertMessageIsAuthenticated(res)
102 self.assertRRsetInAnswer(res, expected2)
103 self.assertMatchingRRSIGInAnswer(res, expected2)
104 ttl2 = self.getCacheTTL()
105 self.assertAlmostEqual(ttl1, ttl2, delta=1)