]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
auth: add options to DomainInfo
[thirdparty/pdns.git] / regression-tests.dnsdist / test_CDB.py
1 #!/usr/bin/env python
2 import cdbx
3 import unittest
4 import dns
5 import os
6 import socket
7 import time
8 from dnsdisttests import DNSDistTest
9
10 def writeCDB(fname, variant=1):
11 cdb = cdbx.CDB.make(fname+'.tmp')
12 cdb.add(socket.inet_aton(f'127.0.0.{variant}'), b'this is the value of the source address tag')
13 cdb.add(b'\x05qname\x03cdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the qname tag')
14 cdb.add(b'\x06suffix\x03cdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the suffix tag')
15 cdb.add(b'this is the value of the qname tag', b'this is the value of the second tag')
16 cdb.commit().close()
17 os.rename(fname+'.tmp', fname)
18
19 @unittest.skipIf('SKIP_CDB_TESTS' in os.environ, 'CDB tests are disabled')
20 class CDBTest(DNSDistTest):
21
22 _cdbFileName = '/tmp/test-cdb-db'
23 _cdbRefreshDelay = 1
24 _config_template = """
25 newServer{address="127.0.0.1:%d"}
26
27 kvs = newCDBKVStore('%s', %d)
28
29 -- KVS lookups follow
30 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
31 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
32
33 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
34 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
35
36 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
37 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
38 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
39
40 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
41 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
42
43 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
44 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
45
46 -- Now we take action based on the result of the lookups
47 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
48 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
49
50 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
51 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
52
53 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
54 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
55
56 -- otherwise, spoof a different response
57 addAction(AllRule(), SpoofAction('9.9.9.9'))
58 """
59 _config_params = ['_testServerPort', '_cdbFileName', '_cdbRefreshDelay']
60
61 class TestCDBSimple(CDBTest):
62
63 @classmethod
64 def setUpCDB(cls):
65 writeCDB(cls._cdbFileName, 1)
66
67 @classmethod
68 def setUpClass(cls):
69
70 cls.setUpCDB()
71 cls.startResponders()
72 cls.startDNSDist()
73 cls.setUpSockets()
74
75 print("Launching tests..")
76
77 def testCDBSource(self):
78 """
79 CDB: Match on source address
80 """
81 name = 'source-ip.cdb.tests.powerdns.com.'
82 query = dns.message.make_query(name, 'A', 'IN')
83 # dnsdist set RA = RD for spoofed responses
84 query.flags &= ~dns.flags.RD
85 expectedResponse = dns.message.make_response(query)
86 rrset = dns.rrset.from_text(name,
87 3600,
88 dns.rdataclass.IN,
89 dns.rdatatype.A,
90 '5.6.7.8')
91 expectedResponse.answer.append(rrset)
92
93 for method in ("sendUDPQuery", "sendTCPQuery"):
94 sender = getattr(self, method)
95 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
96 self.assertFalse(receivedQuery)
97 self.assertTrue(receivedResponse)
98 self.assertEqual(expectedResponse, receivedResponse)
99
100 def testCDBQNamePlusTagLookup(self):
101 """
102 CDB: Match on qname then does a second lookup using the value of the first lookup
103 """
104 name = 'qname.cdb.tests.powerdns.com.'
105 query = dns.message.make_query(name, 'A', 'IN')
106 # dnsdist set RA = RD for spoofed responses
107 query.flags &= ~dns.flags.RD
108 expectedResponse = dns.message.make_response(query)
109 rrset = dns.rrset.from_text(name,
110 3600,
111 dns.rdataclass.IN,
112 dns.rdatatype.A,
113 '1.2.3.4')
114 expectedResponse.answer.append(rrset)
115
116 for method in ("sendUDPQuery", "sendTCPQuery"):
117 sender = getattr(self, method)
118 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
119 self.assertFalse(receivedQuery)
120 self.assertTrue(receivedResponse)
121 self.assertEqual(expectedResponse, receivedResponse)
122
123 def testCDBSuffixLookup(self):
124 """
125 CDB: Match on the qname via a suffix lookup
126 """
127 name = 'sub.sub.suffix.cdb.tests.powerdns.com.'
128 query = dns.message.make_query(name, 'A', 'IN')
129 # dnsdist set RA = RD for spoofed responses
130 query.flags &= ~dns.flags.RD
131 expectedResponse = dns.message.make_response(query)
132 rrset = dns.rrset.from_text(name,
133 3600,
134 dns.rdataclass.IN,
135 dns.rdatatype.A,
136 '42.42.42.42')
137 expectedResponse.answer.append(rrset)
138
139 for method in ("sendUDPQuery", "sendTCPQuery"):
140 sender = getattr(self, method)
141 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
142 self.assertFalse(receivedQuery)
143 self.assertTrue(receivedResponse)
144 self.assertEqual(expectedResponse, receivedResponse)
145
146 class TestCDBReload(CDBTest):
147
148 @classmethod
149 def setUpCDB(cls):
150 writeCDB(cls._cdbFileName, 1)
151
152 @classmethod
153 def setUpClass(cls):
154
155 cls.setUpCDB()
156 cls.startResponders()
157 cls.startDNSDist()
158 cls.setUpSockets()
159
160 print("Launching tests..")
161
162 def testCDBReload(self):
163 """
164 CDB: Test that the CDB is correctly reloaded
165 """
166 name = 'reload.cdb.tests.powerdns.com.'
167 query = dns.message.make_query(name, 'A', 'IN')
168 # dnsdist set RA = RD for spoofed responses
169 query.flags &= ~dns.flags.RD
170 expectedResponse = dns.message.make_response(query)
171 rrset = dns.rrset.from_text(name,
172 3600,
173 dns.rdataclass.IN,
174 dns.rdatatype.A,
175 '5.6.7.8')
176 expectedResponse.answer.append(rrset)
177
178 # only the source address should match
179 for method in ("sendUDPQuery", "sendTCPQuery"):
180 sender = getattr(self, method)
181 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
182 self.assertFalse(receivedQuery)
183 self.assertTrue(receivedResponse)
184 self.assertEqual(expectedResponse, receivedResponse)
185
186 # write a new CDB which has no entry for 127.0.0.1
187 writeCDB(self._cdbFileName, 2)
188 # wait long enough for the CDB database to be reloaded
189 time.sleep(self._cdbRefreshDelay + 1)
190
191 expectedResponse = dns.message.make_response(query)
192 rrset = dns.rrset.from_text(name,
193 3600,
194 dns.rdataclass.IN,
195 dns.rdatatype.A,
196 '9.9.9.9')
197 expectedResponse.answer.append(rrset)
198
199 # nothing (qname, suffix or source IP) should match
200 for method in ("sendUDPQuery", "sendTCPQuery"):
201 sender = getattr(self, method)
202 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
203 self.assertFalse(receivedQuery)
204 self.assertTrue(receivedResponse)
205 self.assertEqual(expectedResponse, receivedResponse)