]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_LMDB.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_LMDB.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import lmdb
5 import os
6 import socket
7 from dnsdisttests import DNSDistTest
8
9 @unittest.skipIf('SKIP_LMDB_TESTS' in os.environ, 'LMDB tests are disabled')
10 class TestLMDB(DNSDistTest):
11
12 _lmdbFileName = '/tmp/test-lmdb-db'
13 _lmdbDBName = 'db-name'
14 _config_template = """
15 newServer{address="127.0.0.1:%d"}
16
17 kvs = newLMDBKVStore('%s', '%s')
18
19 -- KVS lookups follow
20 -- if the qname is 'kvs-rule.lmdb.tests.powerdns.com.', does a lookup in the LMDB database using the qname as key, and spoof an answer if it matches
21 addAction(AndRule{QNameRule('kvs-rule.lmdb.tests.powerdns.com.'), KeyValueStoreLookupRule(kvs, KeyValueLookupKeyQName(false))}, SpoofAction('13.14.15.16'))
22
23 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
24 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
25
26 -- does a lookup in the LMDB database using the qname in _plain text_ format as key, and store the result into the 'kvs-plain-text-result' tag
27 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(false), 'kvs-plain-text-result'))
28 -- if the value of the 'kvs-plain-text-result' is set to 'this is the value of the plaintext tag', spoof a response
29 addAction(TagRule('kvs-plain-text-result', 'this is the value of the plaintext tag'), SpoofAction('9.10.11.12'))
30
31 -- does a lookup in the LMDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
32 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
33
34 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
35 -- does a lookup in the LMDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
36 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
37
38 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
39 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
40
41 -- does a lookup in the LMDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
42 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
43
44 -- Now we take action based on the result of the lookups
45 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
46 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
47
48 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
49 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
50
51 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
52 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
53
54 -- otherwise, spoof a different response
55 addAction(AllRule(), SpoofAction('9.9.9.9'))
56 """
57 _config_params = ['_testServerPort', '_lmdbFileName', '_lmdbDBName']
58
59 @classmethod
60 def setUpLMDB(cls):
61 env = lmdb.open(cls._lmdbFileName, map_size=1014*1024, max_dbs=1024, subdir=False)
62 db = env.open_db(key=cls._lmdbDBName.encode())
63 with env.begin(db=db, write=True) as txn:
64 txn.put(b'\x05qname\x04lmdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the qname tag')
65 txn.put(socket.inet_aton('127.0.0.1'), b'this is the value of the source address tag')
66 txn.put(b'this is the value of the qname tag', b'this is the value of the second tag')
67 txn.put(b'\x06suffix\x04lmdb\x05tests\x08powerdns\x03com\x00', b'this is the value of the suffix tag')
68 txn.put(b'qname-plaintext.lmdb.tests.powerdns.com', b'this is the value of the plaintext tag')
69 txn.put(b'kvs-rule.lmdb.tests.powerdns.com', b'the value does not matter')
70
71 @classmethod
72 def setUpClass(cls):
73
74 cls.setUpLMDB()
75 cls.startResponders()
76 cls.startDNSDist()
77 cls.setUpSockets()
78
79 print("Launching tests..")
80
81 def testLMDBSource(self):
82 """
83 LMDB: Match on source address
84 """
85 name = 'source-ip.lmdb.tests.powerdns.com.'
86 query = dns.message.make_query(name, 'A', 'IN')
87 # dnsdist set RA = RD for spoofed responses
88 query.flags &= ~dns.flags.RD
89 expectedResponse = dns.message.make_response(query)
90 rrset = dns.rrset.from_text(name,
91 3600,
92 dns.rdataclass.IN,
93 dns.rdatatype.A,
94 '5.6.7.8')
95 expectedResponse.answer.append(rrset)
96
97 for method in ("sendUDPQuery", "sendTCPQuery"):
98 sender = getattr(self, method)
99 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
100 self.assertFalse(receivedQuery)
101 self.assertTrue(receivedResponse)
102 self.assertEquals(expectedResponse, receivedResponse)
103
104 def testLMDBQNamePlusTagLookup(self):
105 """
106 LMDB: Match on qname then does a second lookup using the value of the first lookup
107 """
108 name = 'qname.lmdb.tests.powerdns.com.'
109 query = dns.message.make_query(name, 'A', 'IN')
110 # dnsdist set RA = RD for spoofed responses
111 query.flags &= ~dns.flags.RD
112 expectedResponse = dns.message.make_response(query)
113 rrset = dns.rrset.from_text(name,
114 3600,
115 dns.rdataclass.IN,
116 dns.rdatatype.A,
117 '1.2.3.4')
118 expectedResponse.answer.append(rrset)
119
120 for method in ("sendUDPQuery", "sendTCPQuery"):
121 sender = getattr(self, method)
122 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
123 self.assertFalse(receivedQuery)
124 self.assertTrue(receivedResponse)
125 self.assertEquals(expectedResponse, receivedResponse)
126
127 def testLMDBSuffixLookup(self):
128 """
129 LMDB: Match on the qname via a suffix lookup
130 """
131 name = 'sub.sub.suffix.lmdb.tests.powerdns.com.'
132 query = dns.message.make_query(name, 'A', 'IN')
133 # dnsdist set RA = RD for spoofed responses
134 query.flags &= ~dns.flags.RD
135 expectedResponse = dns.message.make_response(query)
136 rrset = dns.rrset.from_text(name,
137 3600,
138 dns.rdataclass.IN,
139 dns.rdatatype.A,
140 '42.42.42.42')
141 expectedResponse.answer.append(rrset)
142
143 for method in ("sendUDPQuery", "sendTCPQuery"):
144 sender = getattr(self, method)
145 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
146 self.assertFalse(receivedQuery)
147 self.assertTrue(receivedResponse)
148 self.assertEquals(expectedResponse, receivedResponse)
149
150 def testLMDBQNamePlainText(self):
151 """
152 LMDB: Match on qname in plain text format
153 """
154 name = 'qname-plaintext.lmdb.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN')
156 # dnsdist set RA = RD for spoofed responses
157 query.flags &= ~dns.flags.RD
158 expectedResponse = dns.message.make_response(query)
159 rrset = dns.rrset.from_text(name,
160 3600,
161 dns.rdataclass.IN,
162 dns.rdatatype.A,
163 '9.10.11.12')
164 expectedResponse.answer.append(rrset)
165
166 for method in ("sendUDPQuery", "sendTCPQuery"):
167 sender = getattr(self, method)
168 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
169 self.assertFalse(receivedQuery)
170 self.assertTrue(receivedResponse)
171 self.assertEquals(expectedResponse, receivedResponse)
172
173 def testLMDBKeyValueStoreLookupRule(self):
174 """
175 LMDB: KeyValueStoreLookupRule
176 """
177 name = 'kvs-rule.lmdb.tests.powerdns.com.'
178 query = dns.message.make_query(name, 'A', 'IN')
179 # dnsdist set RA = RD for spoofed responses
180 query.flags &= ~dns.flags.RD
181 expectedResponse = dns.message.make_response(query)
182 rrset = dns.rrset.from_text(name,
183 3600,
184 dns.rdataclass.IN,
185 dns.rdatatype.A,
186 '13.14.15.16')
187 expectedResponse.answer.append(rrset)
188
189 for method in ("sendUDPQuery", "sendTCPQuery"):
190 sender = getattr(self, method)
191 (receivedQuery, receivedResponse) = sender(query, response=None, useQueue=False)
192 self.assertFalse(receivedQuery)
193 self.assertTrue(receivedResponse)
194 self.assertEquals(expectedResponse, receivedResponse)