]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_LMDB.py
9 from dnsdisttests
import DNSDistTest
11 @unittest . skipIf ( 'SKIP_LMDB_TESTS' in os
. environ
, 'LMDB tests are disabled' )
12 class TestLMDB ( DNSDistTest
):
14 _lmdbFileName
= '/tmp/test-lmdb-db'
15 _lmdbDBName
= 'db-name'
16 _config_template
= """
17 newServer{address="127.0.0.1: %d "}
19 kvs = newLMDBKVStore(' %s ', ' %s ')
22 -- if the qname is 'kvs-rule.lmdb.tests.powerdns.com.', does a lookup in the LMDB database using the qname as key, and spoof an answer if it matches
23 addAction(AndRule{QNameRule('kvs-rule.lmdb.tests.powerdns.com.'), KeyValueStoreLookupRule(kvs, KeyValueLookupKeyQName(false))}, SpoofAction('13.14.15.16'))
25 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
26 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
28 -- does a lookup in the LMDB database using the qname in _plain text_ format as key, and store the result into the 'kvs-plain-text-result' tag
29 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(false), 'kvs-plain-text-result'))
30 -- if the value of the 'kvs-plain-text-result' is set to 'this is the value of the plaintext tag', spoof a response
31 addAction(TagRule('kvs-plain-text-result', 'this is the value of the plaintext tag'), SpoofAction('9.10.11.12'))
33 -- does a lookup in the LMDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
34 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
36 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
37 -- does a lookup in the LMDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
38 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
40 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
41 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
43 -- does a lookup in the LMDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
44 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
46 -- Now we take action based on the result of the lookups
47 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
48 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
50 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
51 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
53 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
54 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
56 -- otherwise, spoof a different response
57 addAction(AllRule(), SpoofAction('9.9.9.9'))
59 _config_params
= [ '_testServerPort' , '_lmdbFileName' , '_lmdbDBName' ]
63 env
= lmdb
. open ( cls
._l mdbFileName
, map_size
= 1014 * 1024 , max_dbs
= 1024 , subdir
= False )
64 db
= env
. open_db ( key
= cls
._l mdbDBName
. encode ())
65 with env
. begin ( db
= db
, write
= True ) as txn
:
66 txn
. put ( b
' \x05 qname \x04 lmdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the qname tag' )
67 txn
. put ( socket
. inet_aton ( '127.0.0.1' ), b
'this is the value of the source address tag' )
68 txn
. put ( b
'this is the value of the qname tag' , b
'this is the value of the second tag' )
69 txn
. put ( b
' \x06 suffix \x04 lmdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the suffix tag' )
70 txn
. put ( b
'qname-plaintext.lmdb.tests.powerdns.com' , b
'this is the value of the plaintext tag' )
71 txn
. put ( b
'kvs-rule.lmdb.tests.powerdns.com' , b
'the value does not matter' )
81 print ( "Launching tests.." )
83 def testLMDBSource ( self
):
85 LMDB: Match on source address
87 name
= 'source-ip.lmdb.tests.powerdns.com.'
88 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
89 # dnsdist set RA = RD for spoofed responses
90 query
. flags
&= ~dns
. flags
. RD
91 expectedResponse
= dns
. message
. make_response ( query
)
92 rrset
= dns
. rrset
. from_text ( name
,
97 expectedResponse
. answer
. append ( rrset
)
99 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
100 sender
= getattr ( self
, method
)
101 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
102 self
. assertFalse ( receivedQuery
)
103 self
. assertTrue ( receivedResponse
)
104 self
. assertEqual ( expectedResponse
, receivedResponse
)
106 def testLMDBQNamePlusTagLookup ( self
):
108 LMDB: Match on qname then does a second lookup using the value of the first lookup
110 name
= 'qname.lmdb.tests.powerdns.com.'
111 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
112 # dnsdist set RA = RD for spoofed responses
113 query
. flags
&= ~dns
. flags
. RD
114 expectedResponse
= dns
. message
. make_response ( query
)
115 rrset
= dns
. rrset
. from_text ( name
,
120 expectedResponse
. answer
. append ( rrset
)
122 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
123 sender
= getattr ( self
, method
)
124 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
125 self
. assertFalse ( receivedQuery
)
126 self
. assertTrue ( receivedResponse
)
127 self
. assertEqual ( expectedResponse
, receivedResponse
)
129 def testLMDBSuffixLookup ( self
):
131 LMDB: Match on the qname via a suffix lookup
133 name
= 'sub.sub.suffix.lmdb.tests.powerdns.com.'
134 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
135 # dnsdist set RA = RD for spoofed responses
136 query
. flags
&= ~dns
. flags
. RD
137 expectedResponse
= dns
. message
. make_response ( query
)
138 rrset
= dns
. rrset
. from_text ( name
,
143 expectedResponse
. answer
. append ( rrset
)
145 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
146 sender
= getattr ( self
, method
)
147 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
148 self
. assertFalse ( receivedQuery
)
149 self
. assertTrue ( receivedResponse
)
150 self
. assertEqual ( expectedResponse
, receivedResponse
)
152 def testLMDBQNamePlainText ( self
):
154 LMDB: Match on qname in plain text format
156 name
= 'qname-plaintext.lmdb.tests.powerdns.com.'
157 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
158 # dnsdist set RA = RD for spoofed responses
159 query
. flags
&= ~dns
. flags
. RD
160 expectedResponse
= dns
. message
. make_response ( query
)
161 rrset
= dns
. rrset
. from_text ( name
,
166 expectedResponse
. answer
. append ( rrset
)
168 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
169 sender
= getattr ( self
, method
)
170 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
171 self
. assertFalse ( receivedQuery
)
172 self
. assertTrue ( receivedResponse
)
173 self
. assertEqual ( expectedResponse
, receivedResponse
)
175 def testLMDBKeyValueStoreLookupRule ( self
):
177 LMDB: KeyValueStoreLookupRule
179 name
= 'kvs-rule.lmdb.tests.powerdns.com.'
180 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
181 # dnsdist set RA = RD for spoofed responses
182 query
. flags
&= ~dns
. flags
. RD
183 expectedResponse
= dns
. message
. make_response ( query
)
184 rrset
= dns
. rrset
. from_text ( name
,
189 expectedResponse
. answer
. append ( rrset
)
191 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
192 sender
= getattr ( self
, method
)
193 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
194 self
. assertFalse ( receivedQuery
)
195 self
. assertTrue ( receivedResponse
)
196 self
. assertEqual ( expectedResponse
, receivedResponse
)
198 class TestLMDBIPInRange ( DNSDistTest
):
200 _lmdbFileName
= '/tmp/test-lmdb-range-1-db'
201 _lmdbDBName
= 'db-name'
202 _config_template
= """
203 newServer{address="127.0.0.1: %d "}
205 kvs = newLMDBKVStore(' %s ', ' %s ')
207 -- KVS range lookups follow
208 -- does a range lookup in the LMDB database using the source IP as key
209 addAction(KeyValueStoreRangeLookupRule(kvs, KeyValueLookupKeySourceIP(32, 128, true)), SpoofAction('5.6.7.8'))
211 -- otherwise, spoof a different response
212 addAction(AllRule(), SpoofAction('9.9.9.9'))
214 _config_params
= [ '_testServerPort' , '_lmdbFileName' , '_lmdbDBName' ]
218 env
= lmdb
. open ( cls
._l mdbFileName
, map_size
= 1014 * 1024 , max_dbs
= 1024 , subdir
= False )
219 db
= env
. open_db ( key
= cls
._l mdbDBName
. encode ())
220 with env
. begin ( db
= db
, write
= True ) as txn
:
221 txn
. put ( socket
. inet_aton ( '127.255.255.255' ) + struct
. pack ( "!H" , 255 ), socket
. inet_aton ( '127.0.0.0' ) + struct
. pack ( "!H" , 0 ) + b
'this is the value of the source address tag' )
227 cls
. startResponders ()
231 print ( "Launching tests.." )
233 def testLMDBSource ( self
):
235 LMDB range: Match on source address
237 name
= 'source-ip.lmdb-range.tests.powerdns.com.'
238 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
239 # dnsdist set RA = RD for spoofed responses
240 query
. flags
&= ~dns
. flags
. RD
241 expectedResponse
= dns
. message
. make_response ( query
)
242 rrset
= dns
. rrset
. from_text ( name
,
247 expectedResponse
. answer
. append ( rrset
)
249 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
250 sender
= getattr ( self
, method
)
251 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
252 self
. assertFalse ( receivedQuery
)
253 self
. assertTrue ( receivedResponse
)
254 self
. assertEqual ( expectedResponse
, receivedResponse
)
256 class TestLMDBIPNotInRange ( DNSDistTest
):
258 _lmdbFileName
= '/tmp/test-lmdb-range-2-db'
259 _lmdbDBName
= 'db-name'
260 _config_template
= """
261 newServer{address="127.0.0.1: %d "}
263 kvs = newLMDBKVStore(' %s ', ' %s ')
265 -- KVS range lookups follow
266 -- does a range lookup in the LMDB database using the source IP as key
267 addAction(KeyValueStoreRangeLookupRule(kvs, KeyValueLookupKeySourceIP(32, 128, true)), SpoofAction('5.6.7.8'))
269 -- otherwise, spoof a different response
270 addAction(AllRule(), SpoofAction('9.9.9.9'))
272 _config_params
= [ '_testServerPort' , '_lmdbFileName' , '_lmdbDBName' ]
276 env
= lmdb
. open ( cls
._l mdbFileName
, map_size
= 1014 * 1024 , max_dbs
= 1024 , subdir
= False )
277 db
= env
. open_db ( key
= cls
._l mdbDBName
. encode ())
278 with env
. begin ( db
= db
, write
= True ) as txn
:
279 txn
. put ( socket
. inet_aton ( '127.0.0.0' ) + struct
. pack ( "!H" , 255 ), socket
. inet_aton ( '127.0.0.0' ) + struct
. pack ( "!H" , 0 ) + b
'this is the value of the source address tag' )
285 cls
. startResponders ()
289 print ( "Launching tests.." )
291 def testLMDBSource ( self
):
293 LMDB not in range: Match on source address
295 name
= 'source-ip.lmdb-not-in-range.tests.powerdns.com.'
296 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
297 # dnsdist set RA = RD for spoofed responses
298 query
. flags
&= ~dns
. flags
. RD
299 expectedResponse
= dns
. message
. make_response ( query
)
300 rrset
= dns
. rrset
. from_text ( name
,
305 expectedResponse
. answer
. append ( rrset
)
307 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
308 sender
= getattr ( self
, method
)
309 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
310 self
. assertFalse ( receivedQuery
)
311 self
. assertTrue ( receivedResponse
)
312 self
. assertEqual ( expectedResponse
, receivedResponse
)