]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_LMDB.py
7 from dnsdisttests
import DNSDistTest
9 @unittest . skipIf ( 'SKIP_LMDB_TESTS' in os
. environ
, 'LMDB tests are disabled' )
10 class TestLMDB ( DNSDistTest
):
12 _lmdbFileName
= '/tmp/test-lmdb-db'
13 _lmdbDBName
= 'db-name'
14 _config_template
= """
15 newServer{address="127.0.0.1: %d "}
17 kvs = newLMDBKVStore(' %s ', ' %s ')
20 -- if the qname is 'kvs-rule.lmdb.tests.powerdns.com.', does a lookup in the LMDB database using the qname as key, and spoof an answer if it matches
21 addAction(AndRule{QNameRule('kvs-rule.lmdb.tests.powerdns.com.'), KeyValueStoreLookupRule(kvs, KeyValueLookupKeyQName(false))}, SpoofAction('13.14.15.16'))
23 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
24 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
26 -- does a lookup in the LMDB database using the qname in _plain text_ format as key, and store the result into the 'kvs-plain-text-result' tag
27 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(false), 'kvs-plain-text-result'))
28 -- if the value of the 'kvs-plain-text-result' is set to 'this is the value of the plaintext tag', spoof a response
29 addAction(TagRule('kvs-plain-text-result', 'this is the value of the plaintext tag'), SpoofAction('9.10.11.12'))
31 -- does a lookup in the LMDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
32 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
34 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
35 -- does a lookup in the LMDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
36 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
38 -- does a lookup in the LMDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
39 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
41 -- does a lookup in the LMDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
42 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
44 -- Now we take action based on the result of the lookups
45 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
46 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
48 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
49 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
51 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
52 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
54 -- otherwise, spoof a different response
55 addAction(AllRule(), SpoofAction('9.9.9.9'))
57 _config_params
= [ '_testServerPort' , '_lmdbFileName' , '_lmdbDBName' ]
61 env
= lmdb
. open ( cls
._l mdbFileName
, map_size
= 1014 * 1024 , max_dbs
= 1024 , subdir
= False )
62 db
= env
. open_db ( key
= cls
._l mdbDBName
. encode ())
63 with env
. begin ( db
= db
, write
= True ) as txn
:
64 txn
. put ( b
' \x05 qname \x04 lmdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the qname tag' )
65 txn
. put ( socket
. inet_aton ( '127.0.0.1' ), b
'this is the value of the source address tag' )
66 txn
. put ( b
'this is the value of the qname tag' , b
'this is the value of the second tag' )
67 txn
. put ( b
' \x06 suffix \x04 lmdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the suffix tag' )
68 txn
. put ( b
'qname-plaintext.lmdb.tests.powerdns.com' , b
'this is the value of the plaintext tag' )
69 txn
. put ( b
'kvs-rule.lmdb.tests.powerdns.com' , b
'the value does not matter' )
79 print ( "Launching tests.." )
81 def testLMDBSource ( self
):
83 LMDB: Match on source address
85 name
= 'source-ip.lmdb.tests.powerdns.com.'
86 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
87 # dnsdist set RA = RD for spoofed responses
88 query
. flags
&= ~dns
. flags
. RD
89 expectedResponse
= dns
. message
. make_response ( query
)
90 rrset
= dns
. rrset
. from_text ( name
,
95 expectedResponse
. answer
. append ( rrset
)
97 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
98 sender
= getattr ( self
, method
)
99 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
100 self
. assertFalse ( receivedQuery
)
101 self
. assertTrue ( receivedResponse
)
102 self
. assertEqual ( expectedResponse
, receivedResponse
)
104 def testLMDBQNamePlusTagLookup ( self
):
106 LMDB: Match on qname then does a second lookup using the value of the first lookup
108 name
= 'qname.lmdb.tests.powerdns.com.'
109 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
110 # dnsdist set RA = RD for spoofed responses
111 query
. flags
&= ~dns
. flags
. RD
112 expectedResponse
= dns
. message
. make_response ( query
)
113 rrset
= dns
. rrset
. from_text ( name
,
118 expectedResponse
. answer
. append ( rrset
)
120 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
121 sender
= getattr ( self
, method
)
122 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
123 self
. assertFalse ( receivedQuery
)
124 self
. assertTrue ( receivedResponse
)
125 self
. assertEqual ( expectedResponse
, receivedResponse
)
127 def testLMDBSuffixLookup ( self
):
129 LMDB: Match on the qname via a suffix lookup
131 name
= 'sub.sub.suffix.lmdb.tests.powerdns.com.'
132 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
133 # dnsdist set RA = RD for spoofed responses
134 query
. flags
&= ~dns
. flags
. RD
135 expectedResponse
= dns
. message
. make_response ( query
)
136 rrset
= dns
. rrset
. from_text ( name
,
141 expectedResponse
. answer
. append ( rrset
)
143 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
144 sender
= getattr ( self
, method
)
145 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
146 self
. assertFalse ( receivedQuery
)
147 self
. assertTrue ( receivedResponse
)
148 self
. assertEqual ( expectedResponse
, receivedResponse
)
150 def testLMDBQNamePlainText ( self
):
152 LMDB: Match on qname in plain text format
154 name
= 'qname-plaintext.lmdb.tests.powerdns.com.'
155 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
156 # dnsdist set RA = RD for spoofed responses
157 query
. flags
&= ~dns
. flags
. RD
158 expectedResponse
= dns
. message
. make_response ( query
)
159 rrset
= dns
. rrset
. from_text ( name
,
164 expectedResponse
. answer
. append ( rrset
)
166 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
167 sender
= getattr ( self
, method
)
168 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
169 self
. assertFalse ( receivedQuery
)
170 self
. assertTrue ( receivedResponse
)
171 self
. assertEqual ( expectedResponse
, receivedResponse
)
173 def testLMDBKeyValueStoreLookupRule ( self
):
175 LMDB: KeyValueStoreLookupRule
177 name
= 'kvs-rule.lmdb.tests.powerdns.com.'
178 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
179 # dnsdist set RA = RD for spoofed responses
180 query
. flags
&= ~dns
. flags
. RD
181 expectedResponse
= dns
. message
. make_response ( query
)
182 rrset
= dns
. rrset
. from_text ( name
,
187 expectedResponse
. answer
. append ( rrset
)
189 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
190 sender
= getattr ( self
, method
)
191 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
192 self
. assertFalse ( receivedQuery
)
193 self
. assertTrue ( receivedResponse
)
194 self
. assertEqual ( expectedResponse
, receivedResponse
)