]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
8 from dnsdisttests
import DNSDistTest
10 def writeCDB ( fname
, variant
= 1 ):
11 cdb
= cdbx
. CDB
. make ( fname
+ '.tmp' )
12 cdb
. add ( socket
. inet_aton ( f
'127.0.0. {variant} ' ), b
'this is the value of the source address tag' )
13 cdb
. add ( b
' \x05 qname \x03 cdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the qname tag' )
14 cdb
. add ( b
' \x06 suffix \x03 cdb \x05 tests \x08 powerdns \x03 com \x00 ' , b
'this is the value of the suffix tag' )
15 cdb
. add ( b
'this is the value of the qname tag' , b
'this is the value of the second tag' )
17 os
. rename ( fname
+ '.tmp' , fname
)
20 @unittest . skipIf ( 'SKIP_CDB_TESTS' in os
. environ
, 'CDB tests are disabled' )
21 class CDBTest ( DNSDistTest
):
23 _cdbFileName
= '/tmp/test-cdb-db'
25 _config_template
= """
26 newServer{address="127.0.0.1: %d "}
28 kvs = newCDBKVStore(' %s ', %d )
31 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
32 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
34 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
35 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
37 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
38 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
39 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
41 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
42 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
44 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
45 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
47 -- Now we take action based on the result of the lookups
48 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
49 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
51 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
52 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
54 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
55 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
57 -- otherwise, spoof a different response
58 addAction(AllRule(), SpoofAction('9.9.9.9'))
60 _config_params
= [ '_testServerPort' , '_cdbFileName' , '_cdbRefreshDelay' ]
62 class TestCDBSimple ( CDBTest
):
66 writeCDB ( cls
._ cdbFileName
, 1 )
76 print ( "Launching tests.." )
78 def testCDBSource ( self
):
80 CDB: Match on source address
82 name
= 'source-ip.cdb.tests.powerdns.com.'
83 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
84 # dnsdist set RA = RD for spoofed responses
85 query
. flags
&= ~dns
. flags
. RD
86 expectedResponse
= dns
. message
. make_response ( query
)
87 rrset
= dns
. rrset
. from_text ( name
,
92 expectedResponse
. answer
. append ( rrset
)
94 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
95 sender
= getattr ( self
, method
)
96 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
97 self
. assertFalse ( receivedQuery
)
98 self
. assertTrue ( receivedResponse
)
99 self
. assertEqual ( expectedResponse
, receivedResponse
)
101 def testCDBQNamePlusTagLookup ( self
):
103 CDB: Match on qname then does a second lookup using the value of the first lookup
105 name
= 'qname.cdb.tests.powerdns.com.'
106 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
107 # dnsdist set RA = RD for spoofed responses
108 query
. flags
&= ~dns
. flags
. RD
109 expectedResponse
= dns
. message
. make_response ( query
)
110 rrset
= dns
. rrset
. from_text ( name
,
115 expectedResponse
. answer
. append ( rrset
)
117 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
118 sender
= getattr ( self
, method
)
119 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
120 self
. assertFalse ( receivedQuery
)
121 self
. assertTrue ( receivedResponse
)
122 self
. assertEqual ( expectedResponse
, receivedResponse
)
124 def testCDBSuffixLookup ( self
):
126 CDB: Match on the qname via a suffix lookup
128 name
= 'sub.sub.suffix.cdb.tests.powerdns.com.'
129 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
130 # dnsdist set RA = RD for spoofed responses
131 query
. flags
&= ~dns
. flags
. RD
132 expectedResponse
= dns
. message
. make_response ( query
)
133 rrset
= dns
. rrset
. from_text ( name
,
138 expectedResponse
. answer
. append ( rrset
)
140 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
141 sender
= getattr ( self
, method
)
142 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
143 self
. assertFalse ( receivedQuery
)
144 self
. assertTrue ( receivedResponse
)
145 self
. assertEqual ( expectedResponse
, receivedResponse
)
147 class TestCDBReload ( CDBTest
):
151 writeCDB ( cls
._ cdbFileName
, 1 )
157 cls
. startResponders ()
161 print ( "Launching tests.." )
163 def testCDBReload ( self
):
165 CDB: Test that the CDB is correctly reloaded
167 name
= 'reload.cdb.tests.powerdns.com.'
168 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
169 # dnsdist set RA = RD for spoofed responses
170 query
. flags
&= ~dns
. flags
. RD
171 expectedResponse
= dns
. message
. make_response ( query
)
172 rrset
= dns
. rrset
. from_text ( name
,
177 expectedResponse
. answer
. append ( rrset
)
179 # only the source address should match
180 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
181 sender
= getattr ( self
, method
)
182 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
183 self
. assertFalse ( receivedQuery
)
184 self
. assertTrue ( receivedResponse
)
185 self
. assertEqual ( expectedResponse
, receivedResponse
)
187 # write a new CDB which has no entry for 127.0.0.1
188 # first ensure that the mtime will change after writing
191 writeCDB ( self
._ cdbFileName
, 2 )
192 # wait long enough for the CDB database to be reloaded
193 time
. sleep ( self
._ cdbRefreshDelay
+ 1 )
195 expectedResponse
= dns
. message
. make_response ( query
)
196 rrset
= dns
. rrset
. from_text ( name
,
201 expectedResponse
. answer
. append ( rrset
)
203 # nothing (qname, suffix or source IP) should match
204 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
205 sender
= getattr ( self
, method
)
206 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
207 self
. assertFalse ( receivedQuery
)
208 self
. assertTrue ( receivedResponse
)
209 self
. assertEqual ( expectedResponse
, receivedResponse
)