]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
8 from dnsdisttests
import DNSDistTest
10 def writeCDB ( fname
, variant
= 1 ):
11 cdb
= cdbx
. CDB
. make ( fname
+ '.tmp' )
12 cdb
. add ( socket
. inet_aton ( f
'127.0.0. {variant} ' ), b
'this is the value of the source address tag' )
13 cdb
. add ( b
' \x05 qname \x03 cdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the qname tag' )
14 cdb
. add ( b
' \x06 suffix \x03 cdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the suffix tag' )
15 cdb
. add ( b
'this is the value of the qname tag' , b
'this is the value of the second tag' )
17 os
. rename ( fname
+ '.tmp' , fname
)
19 @unittest . skipIf ( 'SKIP_CDB_TESTS' in os
. environ
, 'CDB tests are disabled' )
20 class CDBTest ( DNSDistTest
):
22 _cdbFileName
= '/tmp/test-cdb-db'
24 _config_template
= """
25 newServer{address="127.0.0.1: %d "}
27 kvs = newCDBKVStore(' %s ', %d )
30 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
31 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
33 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
34 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
36 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
37 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
38 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
40 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
41 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
43 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
44 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
46 -- Now we take action based on the result of the lookups
47 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
48 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
50 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
51 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
53 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
54 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
56 -- otherwise, spoof a different response
57 addAction(AllRule(), SpoofAction('9.9.9.9'))
59 _config_params
= [ '_testServerPort' , '_cdbFileName' , '_cdbRefreshDelay' ]
61 class TestCDBSimple ( CDBTest
):
65 writeCDB ( cls
._ cdbFileName
, 1 )
75 print ( "Launching tests.." )
77 def testCDBSource ( self
):
79 CDB: Match on source address
81 name
= 'source-ip.cdb.tests.powerdns.com.'
82 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
83 # dnsdist set RA = RD for spoofed responses
84 query
. flags
&= ~dns
. flags
. RD
85 expectedResponse
= dns
. message
. make_response ( query
)
86 rrset
= dns
. rrset
. from_text ( name
,
91 expectedResponse
. answer
. append ( rrset
)
93 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
94 sender
= getattr ( self
, method
)
95 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
96 self
. assertFalse ( receivedQuery
)
97 self
. assertTrue ( receivedResponse
)
98 self
. assertEqual ( expectedResponse
, receivedResponse
)
100 def testCDBQNamePlusTagLookup ( self
):
102 CDB: Match on qname then does a second lookup using the value of the first lookup
104 name
= 'qname.cdb.tests.powerdns.com.'
105 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
106 # dnsdist set RA = RD for spoofed responses
107 query
. flags
&= ~dns
. flags
. RD
108 expectedResponse
= dns
. message
. make_response ( query
)
109 rrset
= dns
. rrset
. from_text ( name
,
114 expectedResponse
. answer
. append ( rrset
)
116 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
117 sender
= getattr ( self
, method
)
118 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
119 self
. assertFalse ( receivedQuery
)
120 self
. assertTrue ( receivedResponse
)
121 self
. assertEqual ( expectedResponse
, receivedResponse
)
123 def testCDBSuffixLookup ( self
):
125 CDB: Match on the qname via a suffix lookup
127 name
= 'sub.sub.suffix.cdb.tests.powerdns.com.'
128 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
129 # dnsdist set RA = RD for spoofed responses
130 query
. flags
&= ~dns
. flags
. RD
131 expectedResponse
= dns
. message
. make_response ( query
)
132 rrset
= dns
. rrset
. from_text ( name
,
137 expectedResponse
. answer
. append ( rrset
)
139 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
140 sender
= getattr ( self
, method
)
141 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
142 self
. assertFalse ( receivedQuery
)
143 self
. assertTrue ( receivedResponse
)
144 self
. assertEqual ( expectedResponse
, receivedResponse
)
146 class TestCDBReload ( CDBTest
):
150 writeCDB ( cls
._ cdbFileName
, 1 )
156 cls
. startResponders ()
160 print ( "Launching tests.." )
162 def testCDBReload ( self
):
164 CDB: Test that the CDB is correctly reloaded
166 name
= 'reload.cdb.tests.powerdns.com.'
167 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
168 # dnsdist set RA = RD for spoofed responses
169 query
. flags
&= ~dns
. flags
. RD
170 expectedResponse
= dns
. message
. make_response ( query
)
171 rrset
= dns
. rrset
. from_text ( name
,
176 expectedResponse
. answer
. append ( rrset
)
178 # only the source address should match
179 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
180 sender
= getattr ( self
, method
)
181 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
182 self
. assertFalse ( receivedQuery
)
183 self
. assertTrue ( receivedResponse
)
184 self
. assertEqual ( expectedResponse
, receivedResponse
)
186 # write a new CDB which has no entry for 127.0.0.1
187 writeCDB ( self
._ cdbFileName
, 2 )
188 # wait long enough for the CDB database to be reloaded
189 time
. sleep ( self
._ cdbRefreshDelay
+ 1 )
191 expectedResponse
= dns
. message
. make_response ( query
)
192 rrset
= dns
. rrset
. from_text ( name
,
197 expectedResponse
. answer
. append ( rrset
)
199 # nothing (qname, suffix or source IP) should match
200 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
201 sender
= getattr ( self
, method
)
202 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
203 self
. assertFalse ( receivedQuery
)
204 self
. assertTrue ( receivedResponse
)
205 self
. assertEqual ( expectedResponse
, receivedResponse
)