]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_CDB.py
Merge pull request #13594 from rgacogne/ddist-http1
[thirdparty/pdns.git] / regression-tests.dnsdist / test_CDB.py
CommitLineData
d41bbe68 1#!/usr/bin/env python
583c5c5d 2import cdbx
d41bbe68
RG
3import unittest
4import dns
13291274 5import os
d41bbe68
RG
6import socket
7import time
8from dnsdisttests import DNSDistTest
9
583c5c5d
PD
10def writeCDB(fname, variant=1):
11 cdb = cdbx.CDB.make(fname+'.tmp')
12 cdb.add(socket.inet_aton(f'127.0.0.{variant}'), b'this is the value of the source address tag')
13 cdb.add(b'\x05qname\x03cdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the qname tag')
14 cdb.add(b'\x06suffix\x03cdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the suffix tag')
15 cdb.add(b'this is the value of the qname tag', b'this is the value of the second tag')
16 cdb.commit().close()
17 os.rename(fname+'.tmp', fname)
dbb2e877 18 cdb.close()
583c5c5d 19
13291274 20@unittest.skipIf('SKIP_CDB_TESTS' in os.environ, 'CDB tests are disabled')
d41bbe68
RG
21class CDBTest(DNSDistTest):
22
23 _cdbFileName = '/tmp/test-cdb-db'
24 _cdbRefreshDelay = 1
25 _config_template = """
26 newServer{address="127.0.0.1:%d"}
27
28 kvs = newCDBKVStore('%s', %d)
29
30 -- KVS lookups follow
31 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
32 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
33
34 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
35 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
36
37 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
38 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
39 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
40
41 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
42 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
43
44 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
45 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
46
47 -- Now we take action based on the result of the lookups
48 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
49 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
50
51 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
52 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
53
54 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
55 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
56
57 -- otherwise, spoof a different response
58 addAction(AllRule(), SpoofAction('9.9.9.9'))
59 """
60 _config_params = ['_testServerPort', '_cdbFileName', '_cdbRefreshDelay']
61
62class TestCDBSimple(CDBTest):
63
64 @classmethod
65 def setUpCDB(cls):
583c5c5d 66 writeCDB(cls._cdbFileName, 1)
d41bbe68
RG
67
68 @classmethod
69 def setUpClass(cls):
70
71 cls.setUpCDB()
72 cls.startResponders()
73 cls.startDNSDist()
74 cls.setUpSockets()
75
76 print("Launching tests..")
77
78 def testCDBSource(self):
79 """
80 CDB: Match on source address
81 """
82 name = 'source-ip.cdb.tests.powerdns.com.'
83 query = dns.message.make_query(name, 'A', 'IN')
84 # dnsdist set RA = RD for spoofed responses
85 query.flags &= ~dns.flags.RD
86 expectedResponse = dns.message.make_response(query)
87 rrset = dns.rrset.from_text(name,
88 3600,
89 dns.rdataclass.IN,
90 dns.rdatatype.A,
91 '5.6.7.8')
92 expectedResponse.answer.append(rrset)
93
94 for method in ("sendUDPQuery", "sendTCPQuery"):
95 sender = getattr(self, method)
96 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
97 self.assertFalse(receivedQuery)
98 self.assertTrue(receivedResponse)
4bfebc93 99 self.assertEqual(expectedResponse, receivedResponse)
d41bbe68
RG
100
101 def testCDBQNamePlusTagLookup(self):
102 """
103 CDB: Match on qname then does a second lookup using the value of the first lookup
104 """
105 name = 'qname.cdb.tests.powerdns.com.'
106 query = dns.message.make_query(name, 'A', 'IN')
107 # dnsdist set RA = RD for spoofed responses
108 query.flags &= ~dns.flags.RD
109 expectedResponse = dns.message.make_response(query)
110 rrset = dns.rrset.from_text(name,
111 3600,
112 dns.rdataclass.IN,
113 dns.rdatatype.A,
114 '1.2.3.4')
115 expectedResponse.answer.append(rrset)
116
117 for method in ("sendUDPQuery", "sendTCPQuery"):
118 sender = getattr(self, method)
119 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
120 self.assertFalse(receivedQuery)
121 self.assertTrue(receivedResponse)
4bfebc93 122 self.assertEqual(expectedResponse, receivedResponse)
d41bbe68
RG
123
124 def testCDBSuffixLookup(self):
125 """
126 CDB: Match on the qname via a suffix lookup
127 """
128 name = 'sub.sub.suffix.cdb.tests.powerdns.com.'
129 query = dns.message.make_query(name, 'A', 'IN')
130 # dnsdist set RA = RD for spoofed responses
131 query.flags &= ~dns.flags.RD
132 expectedResponse = dns.message.make_response(query)
133 rrset = dns.rrset.from_text(name,
134 3600,
135 dns.rdataclass.IN,
136 dns.rdatatype.A,
137 '42.42.42.42')
138 expectedResponse.answer.append(rrset)
139
140 for method in ("sendUDPQuery", "sendTCPQuery"):
141 sender = getattr(self, method)
142 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
143 self.assertFalse(receivedQuery)
144 self.assertTrue(receivedResponse)
4bfebc93 145 self.assertEqual(expectedResponse, receivedResponse)
d41bbe68
RG
146
147class TestCDBReload(CDBTest):
148
149 @classmethod
150 def setUpCDB(cls):
583c5c5d 151 writeCDB(cls._cdbFileName, 1)
d41bbe68
RG
152
153 @classmethod
154 def setUpClass(cls):
155
156 cls.setUpCDB()
157 cls.startResponders()
158 cls.startDNSDist()
159 cls.setUpSockets()
160
161 print("Launching tests..")
162
163 def testCDBReload(self):
164 """
165 CDB: Test that the CDB is correctly reloaded
166 """
167 name = 'reload.cdb.tests.powerdns.com.'
168 query = dns.message.make_query(name, 'A', 'IN')
169 # dnsdist set RA = RD for spoofed responses
170 query.flags &= ~dns.flags.RD
171 expectedResponse = dns.message.make_response(query)
172 rrset = dns.rrset.from_text(name,
173 3600,
174 dns.rdataclass.IN,
175 dns.rdatatype.A,
176 '5.6.7.8')
177 expectedResponse.answer.append(rrset)
178
179 # only the source address should match
180 for method in ("sendUDPQuery", "sendTCPQuery"):
181 sender = getattr(self, method)
182 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
183 self.assertFalse(receivedQuery)
184 self.assertTrue(receivedResponse)
4bfebc93 185 self.assertEqual(expectedResponse, receivedResponse)
d41bbe68 186
583c5c5d 187 # write a new CDB which has no entry for 127.0.0.1
d8f842ee
RG
188 # first ensure that the mtime will change after writing
189 # the new version
190 time.sleep(1)
583c5c5d 191 writeCDB(self._cdbFileName, 2)
d41bbe68
RG
192 # wait long enough for the CDB database to be reloaded
193 time.sleep(self._cdbRefreshDelay + 1)
194
195 expectedResponse = dns.message.make_response(query)
196 rrset = dns.rrset.from_text(name,
197 3600,
198 dns.rdataclass.IN,
199 dns.rdatatype.A,
200 '9.9.9.9')
201 expectedResponse.answer.append(rrset)
202
203 # nothing (qname, suffix or source IP) should match
204 for method in ("sendUDPQuery", "sendTCPQuery"):
205 sender = getattr(self, method)
206 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
207 self.assertFalse(receivedQuery)
208 self.assertTrue(receivedResponse)
4bfebc93 209 self.assertEqual(expectedResponse, receivedResponse)