]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_LMDB.py
Merge pull request #13851 from dwfreed/fix-alpn-selection
[thirdparty/pdns.git] / regression-tests.dnsdist / test_LMDB.py
CommitLineData
5fc8a17f
RG
1#!/usr/bin/env python
2import unittest
3import dns
4import lmdb
13291274 5import os
5fc8a17f 6import socket
162abc50
RG
7import struct
8
5fc8a17f
RG
9from dnsdisttests import DNSDistTest
10
13291274 11@unittest.skipIf('SKIP_LMDB_TESTS' in os.environ, 'LMDB tests are disabled')
5fc8a17f
RG
12class TestLMDB(DNSDistTest):
13
14 _lmdbFileName = '/tmp/test-lmdb-db'
15 _lmdbDBName = 'db-name'
16 _config_template = """
17 newServer{address="127.0.0.1:%d"}
18
19 kvs = newLMDBKVStore('%s', '%s')
20
21 -- KVS lookups follow
73e1f0c5
RG
22 -- if the qname is 'kvs-rule.lmdb.tests.powerdns.com.', does a lookup in the LMDB database using the qname as key, and spoof an answer if it matches
23 addAction(AndRule{QNameRule('kvs-rule.lmdb.tests.powerdns.com.'), KeyValueStoreLookupRule(kvs, KeyValueLookupKeyQName(false))}, SpoofAction('13.14.15.16'))
24
5fc8a17f
RG
25 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
26 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
27
b5b3fc9b
RG
28 -- does a lookup in the LMDB database using the qname in _plain text_ format as key, and store the result into the 'kvs-plain-text-result' tag
29 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(false), 'kvs-plain-text-result'))
30 -- if the value of the 'kvs-plain-text-result' is set to 'this is the value of the plaintext tag', spoof a response
31 addAction(TagRule('kvs-plain-text-result', 'this is the value of the plaintext tag'), SpoofAction('9.10.11.12'))
32
5fc8a17f
RG
33 -- does a lookup in the LMDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
34 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
35
36 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
37 -- does a lookup in the LMDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
38 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
39
40 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
41 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
42
43 -- does a lookup in the LMDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
44 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
45
46 -- Now we take action based on the result of the lookups
47 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
48 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
49
50 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
51 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
52
53 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
54 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
55
56 -- otherwise, spoof a different response
57 addAction(AllRule(), SpoofAction('9.9.9.9'))
58 """
59 _config_params = ['_testServerPort', '_lmdbFileName', '_lmdbDBName']
60
61 @classmethod
62 def setUpLMDB(cls):
63 env = lmdb.open(cls._lmdbFileName, map_size=1014*1024, max_dbs=1024, subdir=False)
64 db = env.open_db(key=cls._lmdbDBName.encode())
65 with env.begin(db=db, write=True) as txn:
66 txn.put(b'\x05qname\x04lmdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the qname tag')
67 txn.put(socket.inet_aton('127.0.0.1'), b'this is the value of the source address tag')
68 txn.put(b'this is the value of the qname tag', b'this is the value of the second tag')
69 txn.put(b'\x06suffix\x04lmdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the suffix tag')
752db0de
RG
70 txn.put(b'qname-plaintext.lmdb.tests.powerdns.com', b'this is the value of the plaintext tag')
71 txn.put(b'kvs-rule.lmdb.tests.powerdns.com', b'the value does not matter')
5fc8a17f
RG
72
73 @classmethod
74 def setUpClass(cls):
75
76 cls.setUpLMDB()
77 cls.startResponders()
78 cls.startDNSDist()
79 cls.setUpSockets()
80
81 print("Launching tests..")
82
83 def testLMDBSource(self):
84 """
85 LMDB: Match on source address
86 """
87 name = 'source-ip.lmdb.tests.powerdns.com.'
88 query = dns.message.make_query(name, 'A', 'IN')
89 # dnsdist set RA = RD for spoofed responses
90 query.flags &= ~dns.flags.RD
91 expectedResponse = dns.message.make_response(query)
92 rrset = dns.rrset.from_text(name,
93 3600,
94 dns.rdataclass.IN,
95 dns.rdatatype.A,
96 '5.6.7.8')
97 expectedResponse.answer.append(rrset)
98
99 for method in ("sendUDPQuery", "sendTCPQuery"):
100 sender = getattr(self, method)
101 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
102 self.assertFalse(receivedQuery)
103 self.assertTrue(receivedResponse)
4bfebc93 104 self.assertEqual(expectedResponse, receivedResponse)
5fc8a17f
RG
105
106 def testLMDBQNamePlusTagLookup(self):
107 """
108 LMDB: Match on qname then does a second lookup using the value of the first lookup
109 """
110 name = 'qname.lmdb.tests.powerdns.com.'
111 query = dns.message.make_query(name, 'A', 'IN')
112 # dnsdist set RA = RD for spoofed responses
113 query.flags &= ~dns.flags.RD
114 expectedResponse = dns.message.make_response(query)
115 rrset = dns.rrset.from_text(name,
116 3600,
117 dns.rdataclass.IN,
118 dns.rdatatype.A,
119 '1.2.3.4')
120 expectedResponse.answer.append(rrset)
121
122 for method in ("sendUDPQuery", "sendTCPQuery"):
123 sender = getattr(self, method)
124 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
125 self.assertFalse(receivedQuery)
126 self.assertTrue(receivedResponse)
4bfebc93 127 self.assertEqual(expectedResponse, receivedResponse)
5fc8a17f
RG
128
129 def testLMDBSuffixLookup(self):
130 """
131 LMDB: Match on the qname via a suffix lookup
132 """
133 name = 'sub.sub.suffix.lmdb.tests.powerdns.com.'
134 query = dns.message.make_query(name, 'A', 'IN')
135 # dnsdist set RA = RD for spoofed responses
136 query.flags &= ~dns.flags.RD
137 expectedResponse = dns.message.make_response(query)
138 rrset = dns.rrset.from_text(name,
139 3600,
140 dns.rdataclass.IN,
141 dns.rdatatype.A,
142 '42.42.42.42')
143 expectedResponse.answer.append(rrset)
144
145 for method in ("sendUDPQuery", "sendTCPQuery"):
146 sender = getattr(self, method)
147 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
148 self.assertFalse(receivedQuery)
149 self.assertTrue(receivedResponse)
4bfebc93 150 self.assertEqual(expectedResponse, receivedResponse)
5fc8a17f 151
b5b3fc9b
RG
152 def testLMDBQNamePlainText(self):
153 """
154 LMDB: Match on qname in plain text format
155 """
156 name = 'qname-plaintext.lmdb.tests.powerdns.com.'
157 query = dns.message.make_query(name, 'A', 'IN')
158 # dnsdist set RA = RD for spoofed responses
159 query.flags &= ~dns.flags.RD
160 expectedResponse = dns.message.make_response(query)
161 rrset = dns.rrset.from_text(name,
162 3600,
163 dns.rdataclass.IN,
164 dns.rdatatype.A,
165 '9.10.11.12')
166 expectedResponse.answer.append(rrset)
167
168 for method in ("sendUDPQuery", "sendTCPQuery"):
169 sender = getattr(self, method)
170 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
171 self.assertFalse(receivedQuery)
172 self.assertTrue(receivedResponse)
4bfebc93 173 self.assertEqual(expectedResponse, receivedResponse)
73e1f0c5
RG
174
175 def testLMDBKeyValueStoreLookupRule(self):
176 """
177 LMDB: KeyValueStoreLookupRule
178 """
179 name = 'kvs-rule.lmdb.tests.powerdns.com.'
180 query = dns.message.make_query(name, 'A', 'IN')
181 # dnsdist set RA = RD for spoofed responses
182 query.flags &= ~dns.flags.RD
183 expectedResponse = dns.message.make_response(query)
184 rrset = dns.rrset.from_text(name,
185 3600,
186 dns.rdataclass.IN,
187 dns.rdatatype.A,
188 '13.14.15.16')
189 expectedResponse.answer.append(rrset)
190
191 for method in ("sendUDPQuery", "sendTCPQuery"):
192 sender = getattr(self, method)
193 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
194 self.assertFalse(receivedQuery)
195 self.assertTrue(receivedResponse)
4bfebc93 196 self.assertEqual(expectedResponse, receivedResponse)
162abc50
RG
197
198class TestLMDBIPInRange(DNSDistTest):
199
200 _lmdbFileName = '/tmp/test-lmdb-range-1-db'
201 _lmdbDBName = 'db-name'
202 _config_template = """
203 newServer{address="127.0.0.1:%d"}
204
205 kvs = newLMDBKVStore('%s', '%s')
206
207 -- KVS range lookups follow
208 -- does a range lookup in the LMDB database using the source IP as key
209 addAction(KeyValueStoreRangeLookupRule(kvs, KeyValueLookupKeySourceIP(32, 128, true)), SpoofAction('5.6.7.8'))
210
211 -- otherwise, spoof a different response
212 addAction(AllRule(), SpoofAction('9.9.9.9'))
213 """
214 _config_params = ['_testServerPort', '_lmdbFileName', '_lmdbDBName']
215
216 @classmethod
217 def setUpLMDB(cls):
218 env = lmdb.open(cls._lmdbFileName, map_size=1014*1024, max_dbs=1024, subdir=False)
219 db = env.open_db(key=cls._lmdbDBName.encode())
220 with env.begin(db=db, write=True) as txn:
221 txn.put(socket.inet_aton('127.255.255.255') + struct.pack("!H", 255), socket.inet_aton('127.0.0.0') + struct.pack("!H", 0) + b'this is the value of the source address tag')
222
223 @classmethod
224 def setUpClass(cls):
225
226 cls.setUpLMDB()
227 cls.startResponders()
228 cls.startDNSDist()
229 cls.setUpSockets()
230
231 print("Launching tests..")
232
233 def testLMDBSource(self):
234 """
235 LMDB range: Match on source address
236 """
237 name = 'source-ip.lmdb-range.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'A', 'IN')
239 # dnsdist set RA = RD for spoofed responses
240 query.flags &= ~dns.flags.RD
241 expectedResponse = dns.message.make_response(query)
242 rrset = dns.rrset.from_text(name,
243 3600,
244 dns.rdataclass.IN,
245 dns.rdatatype.A,
246 '5.6.7.8')
247 expectedResponse.answer.append(rrset)
248
249 for method in ("sendUDPQuery", "sendTCPQuery"):
250 sender = getattr(self, method)
251 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
252 self.assertFalse(receivedQuery)
253 self.assertTrue(receivedResponse)
254 self.assertEqual(expectedResponse, receivedResponse)
255
256class TestLMDBIPNotInRange(DNSDistTest):
257
258 _lmdbFileName = '/tmp/test-lmdb-range-2-db'
259 _lmdbDBName = 'db-name'
260 _config_template = """
261 newServer{address="127.0.0.1:%d"}
262
263 kvs = newLMDBKVStore('%s', '%s')
264
265 -- KVS range lookups follow
266 -- does a range lookup in the LMDB database using the source IP as key
267 addAction(KeyValueStoreRangeLookupRule(kvs, KeyValueLookupKeySourceIP(32, 128, true)), SpoofAction('5.6.7.8'))
268
269 -- otherwise, spoof a different response
270 addAction(AllRule(), SpoofAction('9.9.9.9'))
271 """
272 _config_params = ['_testServerPort', '_lmdbFileName', '_lmdbDBName']
273
274 @classmethod
275 def setUpLMDB(cls):
276 env = lmdb.open(cls._lmdbFileName, map_size=1014*1024, max_dbs=1024, subdir=False)
277 db = env.open_db(key=cls._lmdbDBName.encode())
278 with env.begin(db=db, write=True) as txn:
279 txn.put(socket.inet_aton('127.0.0.0') + struct.pack("!H", 255), socket.inet_aton('127.0.0.0') + struct.pack("!H", 0) + b'this is the value of the source address tag')
280
281 @classmethod
282 def setUpClass(cls):
283
284 cls.setUpLMDB()
285 cls.startResponders()
286 cls.startDNSDist()
287 cls.setUpSockets()
288
289 print("Launching tests..")
290
291 def testLMDBSource(self):
292 """
293 LMDB not in range: Match on source address
294 """
295 name = 'source-ip.lmdb-not-in-range.tests.powerdns.com.'
296 query = dns.message.make_query(name, 'A', 'IN')
297 # dnsdist set RA = RD for spoofed responses
298 query.flags &= ~dns.flags.RD
299 expectedResponse = dns.message.make_response(query)
300 rrset = dns.rrset.from_text(name,
301 3600,
302 dns.rdataclass.IN,
303 dns.rdatatype.A,
304 '9.9.9.9')
305 expectedResponse.answer.append(rrset)
306
307 for method in ("sendUDPQuery", "sendTCPQuery"):
308 sender = getattr(self, method)
309 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
310 self.assertFalse(receivedQuery)
311 self.assertTrue(receivedResponse)
312 self.assertEqual(expectedResponse, receivedResponse)