]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
Merge remote-tracking branch 'origin' into errno-review
[thirdparty/pdns.git] / regression-tests.dnsdist / test_CDB.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import shutil
5 import socket
6 import time
7 from dnsdisttests import DNSDistTest
8
9 class CDBTest(DNSDistTest):
10
11 _cdbFileName = '/tmp/test-cdb-db'
12 _cdbRefreshDelay = 1
13 _config_template = """
14 newServer{address="127.0.0.1:%d"}
15
16 kvs = newCDBKVStore('%s', %d)
17
18 -- KVS lookups follow
19 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
20 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
21
22 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
23 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
24
25 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
26 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
27 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
28
29 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
30 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
31
32 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
33 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
34
35 -- Now we take action based on the result of the lookups
36 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
37 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
38
39 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
40 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
41
42 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
43 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
44
45 -- otherwise, spoof a different response
46 addAction(AllRule(), SpoofAction('9.9.9.9'))
47 """
48 _config_params = ['_testServerPort', '_cdbFileName', '_cdbRefreshDelay']
49
50 class TestCDBSimple(CDBTest):
51
52 @classmethod
53 def setUpCDB(cls):
54 shutil.copyfile('kvs.cdb.1', cls._cdbFileName)
55
56 @classmethod
57 def setUpClass(cls):
58
59 cls.setUpCDB()
60 cls.startResponders()
61 cls.startDNSDist()
62 cls.setUpSockets()
63
64 print("Launching tests..")
65
66 def testCDBSource(self):
67 """
68 CDB: Match on source address
69 """
70 name = 'source-ip.cdb.tests.powerdns.com.'
71 query = dns.message.make_query(name, 'A', 'IN')
72 # dnsdist set RA = RD for spoofed responses
73 query.flags &= ~dns.flags.RD
74 expectedResponse = dns.message.make_response(query)
75 rrset = dns.rrset.from_text(name,
76 3600,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '5.6.7.8')
80 expectedResponse.answer.append(rrset)
81
82 for method in ("sendUDPQuery", "sendTCPQuery"):
83 sender = getattr(self, method)
84 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
85 self.assertFalse(receivedQuery)
86 self.assertTrue(receivedResponse)
87 self.assertEquals(expectedResponse, receivedResponse)
88
89 def testCDBQNamePlusTagLookup(self):
90 """
91 CDB: Match on qname then does a second lookup using the value of the first lookup
92 """
93 name = 'qname.cdb.tests.powerdns.com.'
94 query = dns.message.make_query(name, 'A', 'IN')
95 # dnsdist set RA = RD for spoofed responses
96 query.flags &= ~dns.flags.RD
97 expectedResponse = dns.message.make_response(query)
98 rrset = dns.rrset.from_text(name,
99 3600,
100 dns.rdataclass.IN,
101 dns.rdatatype.A,
102 '1.2.3.4')
103 expectedResponse.answer.append(rrset)
104
105 for method in ("sendUDPQuery", "sendTCPQuery"):
106 sender = getattr(self, method)
107 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
108 self.assertFalse(receivedQuery)
109 self.assertTrue(receivedResponse)
110 self.assertEquals(expectedResponse, receivedResponse)
111
112 def testCDBSuffixLookup(self):
113 """
114 CDB: Match on the qname via a suffix lookup
115 """
116 name = 'sub.sub.suffix.cdb.tests.powerdns.com.'
117 query = dns.message.make_query(name, 'A', 'IN')
118 # dnsdist set RA = RD for spoofed responses
119 query.flags &= ~dns.flags.RD
120 expectedResponse = dns.message.make_response(query)
121 rrset = dns.rrset.from_text(name,
122 3600,
123 dns.rdataclass.IN,
124 dns.rdatatype.A,
125 '42.42.42.42')
126 expectedResponse.answer.append(rrset)
127
128 for method in ("sendUDPQuery", "sendTCPQuery"):
129 sender = getattr(self, method)
130 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
131 self.assertFalse(receivedQuery)
132 self.assertTrue(receivedResponse)
133 self.assertEquals(expectedResponse, receivedResponse)
134
135 class TestCDBReload(CDBTest):
136
137 @classmethod
138 def setUpCDB(cls):
139 shutil.copyfile('kvs.cdb.1', cls._cdbFileName)
140
141 @classmethod
142 def setUpClass(cls):
143
144 cls.setUpCDB()
145 cls.startResponders()
146 cls.startDNSDist()
147 cls.setUpSockets()
148
149 print("Launching tests..")
150
151 def testCDBReload(self):
152 """
153 CDB: Test that the CDB is correctly reloaded
154 """
155 name = 'reload.cdb.tests.powerdns.com.'
156 query = dns.message.make_query(name, 'A', 'IN')
157 # dnsdist set RA = RD for spoofed responses
158 query.flags &= ~dns.flags.RD
159 expectedResponse = dns.message.make_response(query)
160 rrset = dns.rrset.from_text(name,
161 3600,
162 dns.rdataclass.IN,
163 dns.rdatatype.A,
164 '5.6.7.8')
165 expectedResponse.answer.append(rrset)
166
167 # only the source address should match
168 for method in ("sendUDPQuery", "sendTCPQuery"):
169 sender = getattr(self, method)
170 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
171 self.assertFalse(receivedQuery)
172 self.assertTrue(receivedResponse)
173 self.assertEquals(expectedResponse, receivedResponse)
174
175 # switch to the second DB which has no entry for 127.0.0.1
176 shutil.copyfile('kvs.cdb.2', self._cdbFileName)
177 # wait long enough for the CDB database to be reloaded
178 time.sleep(self._cdbRefreshDelay + 1)
179
180 expectedResponse = dns.message.make_response(query)
181 rrset = dns.rrset.from_text(name,
182 3600,
183 dns.rdataclass.IN,
184 dns.rdatatype.A,
185 '9.9.9.9')
186 expectedResponse.answer.append(rrset)
187
188 # nothing (qname, suffix or source IP) should match
189 for method in ("sendUDPQuery", "sendTCPQuery"):
190 sender = getattr(self, method)
191 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
192 self.assertFalse(receivedQuery)
193 self.assertTrue(receivedResponse)
194 self.assertEquals(expectedResponse, receivedResponse)