]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_CDB.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import os
5 import shutil
6 import socket
7 import time
8 from dnsdisttests import DNSDistTest
9
10 @unittest.skipIf('SKIP_CDB_TESTS' in os.environ, 'CDB tests are disabled')
11 class CDBTest(DNSDistTest):
12
13 _cdbFileName = '/tmp/test-cdb-db'
14 _cdbRefreshDelay = 1
15 _config_template = """
16 newServer{address="127.0.0.1:%d"}
17
18 kvs = newCDBKVStore('%s', %d)
19
20 -- KVS lookups follow
21 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
22 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
23
24 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
25 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
26
27 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
28 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
29 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
30
31 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
32 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
33
34 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
35 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
36
37 -- Now we take action based on the result of the lookups
38 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
39 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
40
41 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
42 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
43
44 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
45 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
46
47 -- otherwise, spoof a different response
48 addAction(AllRule(), SpoofAction('9.9.9.9'))
49 """
50 _config_params = ['_testServerPort', '_cdbFileName', '_cdbRefreshDelay']
51
52 class TestCDBSimple(CDBTest):
53
54 @classmethod
55 def setUpCDB(cls):
56 shutil.copyfile('kvs.cdb.1', cls._cdbFileName)
57
58 @classmethod
59 def setUpClass(cls):
60
61 cls.setUpCDB()
62 cls.startResponders()
63 cls.startDNSDist()
64 cls.setUpSockets()
65
66 print("Launching tests..")
67
68 def testCDBSource(self):
69 """
70 CDB: Match on source address
71 """
72 name = 'source-ip.cdb.tests.powerdns.com.'
73 query = dns.message.make_query(name, 'A', 'IN')
74 # dnsdist set RA = RD for spoofed responses
75 query.flags &= ~dns.flags.RD
76 expectedResponse = dns.message.make_response(query)
77 rrset = dns.rrset.from_text(name,
78 3600,
79 dns.rdataclass.IN,
80 dns.rdatatype.A,
81 '5.6.7.8')
82 expectedResponse.answer.append(rrset)
83
84 for method in ("sendUDPQuery", "sendTCPQuery"):
85 sender = getattr(self, method)
86 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
87 self.assertFalse(receivedQuery)
88 self.assertTrue(receivedResponse)
89 self.assertEquals(expectedResponse, receivedResponse)
90
91 def testCDBQNamePlusTagLookup(self):
92 """
93 CDB: Match on qname then does a second lookup using the value of the first lookup
94 """
95 name = 'qname.cdb.tests.powerdns.com.'
96 query = dns.message.make_query(name, 'A', 'IN')
97 # dnsdist set RA = RD for spoofed responses
98 query.flags &= ~dns.flags.RD
99 expectedResponse = dns.message.make_response(query)
100 rrset = dns.rrset.from_text(name,
101 3600,
102 dns.rdataclass.IN,
103 dns.rdatatype.A,
104 '1.2.3.4')
105 expectedResponse.answer.append(rrset)
106
107 for method in ("sendUDPQuery", "sendTCPQuery"):
108 sender = getattr(self, method)
109 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
110 self.assertFalse(receivedQuery)
111 self.assertTrue(receivedResponse)
112 self.assertEquals(expectedResponse, receivedResponse)
113
114 def testCDBSuffixLookup(self):
115 """
116 CDB: Match on the qname via a suffix lookup
117 """
118 name = 'sub.sub.suffix.cdb.tests.powerdns.com.'
119 query = dns.message.make_query(name, 'A', 'IN')
120 # dnsdist set RA = RD for spoofed responses
121 query.flags &= ~dns.flags.RD
122 expectedResponse = dns.message.make_response(query)
123 rrset = dns.rrset.from_text(name,
124 3600,
125 dns.rdataclass.IN,
126 dns.rdatatype.A,
127 '42.42.42.42')
128 expectedResponse.answer.append(rrset)
129
130 for method in ("sendUDPQuery", "sendTCPQuery"):
131 sender = getattr(self, method)
132 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
133 self.assertFalse(receivedQuery)
134 self.assertTrue(receivedResponse)
135 self.assertEquals(expectedResponse, receivedResponse)
136
137 class TestCDBReload(CDBTest):
138
139 @classmethod
140 def setUpCDB(cls):
141 shutil.copyfile('kvs.cdb.1', cls._cdbFileName)
142
143 @classmethod
144 def setUpClass(cls):
145
146 cls.setUpCDB()
147 cls.startResponders()
148 cls.startDNSDist()
149 cls.setUpSockets()
150
151 print("Launching tests..")
152
153 def testCDBReload(self):
154 """
155 CDB: Test that the CDB is correctly reloaded
156 """
157 name = 'reload.cdb.tests.powerdns.com.'
158 query = dns.message.make_query(name, 'A', 'IN')
159 # dnsdist set RA = RD for spoofed responses
160 query.flags &= ~dns.flags.RD
161 expectedResponse = dns.message.make_response(query)
162 rrset = dns.rrset.from_text(name,
163 3600,
164 dns.rdataclass.IN,
165 dns.rdatatype.A,
166 '5.6.7.8')
167 expectedResponse.answer.append(rrset)
168
169 # only the source address should match
170 for method in ("sendUDPQuery", "sendTCPQuery"):
171 sender = getattr(self, method)
172 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
173 self.assertFalse(receivedQuery)
174 self.assertTrue(receivedResponse)
175 self.assertEquals(expectedResponse, receivedResponse)
176
177 # switch to the second DB which has no entry for 127.0.0.1
178 shutil.copyfile('kvs.cdb.2', self._cdbFileName)
179 # wait long enough for the CDB database to be reloaded
180 time.sleep(self._cdbRefreshDelay + 1)
181
182 expectedResponse = dns.message.make_response(query)
183 rrset = dns.rrset.from_text(name,
184 3600,
185 dns.rdataclass.IN,
186 dns.rdatatype.A,
187 '9.9.9.9')
188 expectedResponse.answer.append(rrset)
189
190 # nothing (qname, suffix or source IP) should match
191 for method in ("sendUDPQuery", "sendTCPQuery"):
192 sender = getattr(self, method)
193 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
194 self.assertFalse(receivedQuery)
195 self.assertTrue(receivedResponse)
196 self.assertEquals(expectedResponse, receivedResponse)