]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
8 from dnsdisttests
import DNSDistTest
10 @unittest . skipIf ( 'SKIP_CDB_TESTS' in os
. environ
, 'CDB tests are disabled' )
11 class CDBTest ( DNSDistTest
):
13 _cdbFileName
= '/tmp/test-cdb-db'
15 _config_template
= """
16 newServer{address="127.0.0.1: %d "}
18 kvs = newCDBKVStore(' %s ', %d )
21 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
22 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
24 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
25 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
27 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
28 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
29 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
31 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
32 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
34 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
35 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
37 -- Now we take action based on the result of the lookups
38 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
39 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
41 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
42 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
44 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
45 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
47 -- otherwise, spoof a different response
48 addAction(AllRule(), SpoofAction('9.9.9.9'))
50 _config_params
= [ '_testServerPort' , '_cdbFileName' , '_cdbRefreshDelay' ]
52 class TestCDBSimple ( CDBTest
):
56 shutil
. copyfile ( 'kvs.cdb.1' , cls
._ cdbFileName
)
66 print ( "Launching tests.." )
68 def testCDBSource ( self
):
70 CDB: Match on source address
72 name
= 'source-ip.cdb.tests.powerdns.com.'
73 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
74 # dnsdist set RA = RD for spoofed responses
75 query
. flags
&= ~dns
. flags
. RD
76 expectedResponse
= dns
. message
. make_response ( query
)
77 rrset
= dns
. rrset
. from_text ( name
,
82 expectedResponse
. answer
. append ( rrset
)
84 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
85 sender
= getattr ( self
, method
)
86 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
87 self
. assertFalse ( receivedQuery
)
88 self
. assertTrue ( receivedResponse
)
89 self
. assertEquals ( expectedResponse
, receivedResponse
)
91 def testCDBQNamePlusTagLookup ( self
):
93 CDB: Match on qname then does a second lookup using the value of the first lookup
95 name
= 'qname.cdb.tests.powerdns.com.'
96 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
97 # dnsdist set RA = RD for spoofed responses
98 query
. flags
&= ~dns
. flags
. RD
99 expectedResponse
= dns
. message
. make_response ( query
)
100 rrset
= dns
. rrset
. from_text ( name
,
105 expectedResponse
. answer
. append ( rrset
)
107 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
108 sender
= getattr ( self
, method
)
109 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
110 self
. assertFalse ( receivedQuery
)
111 self
. assertTrue ( receivedResponse
)
112 self
. assertEquals ( expectedResponse
, receivedResponse
)
114 def testCDBSuffixLookup ( self
):
116 CDB: Match on the qname via a suffix lookup
118 name
= 'sub.sub.suffix.cdb.tests.powerdns.com.'
119 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
120 # dnsdist set RA = RD for spoofed responses
121 query
. flags
&= ~dns
. flags
. RD
122 expectedResponse
= dns
. message
. make_response ( query
)
123 rrset
= dns
. rrset
. from_text ( name
,
128 expectedResponse
. answer
. append ( rrset
)
130 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
131 sender
= getattr ( self
, method
)
132 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
133 self
. assertFalse ( receivedQuery
)
134 self
. assertTrue ( receivedResponse
)
135 self
. assertEquals ( expectedResponse
, receivedResponse
)
137 class TestCDBReload ( CDBTest
):
141 shutil
. copyfile ( 'kvs.cdb.1' , cls
._ cdbFileName
)
147 cls
. startResponders ()
151 print ( "Launching tests.." )
153 def testCDBReload ( self
):
155 CDB: Test that the CDB is correctly reloaded
157 name
= 'reload.cdb.tests.powerdns.com.'
158 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
159 # dnsdist set RA = RD for spoofed responses
160 query
. flags
&= ~dns
. flags
. RD
161 expectedResponse
= dns
. message
. make_response ( query
)
162 rrset
= dns
. rrset
. from_text ( name
,
167 expectedResponse
. answer
. append ( rrset
)
169 # only the source address should match
170 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
171 sender
= getattr ( self
, method
)
172 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
173 self
. assertFalse ( receivedQuery
)
174 self
. assertTrue ( receivedResponse
)
175 self
. assertEquals ( expectedResponse
, receivedResponse
)
177 # switch to the second DB which has no entry for 127.0.0.1
178 shutil
. copyfile ( 'kvs.cdb.2' , self
._ cdbFileName
)
179 # wait long enough for the CDB database to be reloaded
180 time
. sleep ( self
._ cdbRefreshDelay
+ 1 )
182 expectedResponse
= dns
. message
. make_response ( query
)
183 rrset
= dns
. rrset
. from_text ( name
,
188 expectedResponse
. answer
. append ( rrset
)
190 # nothing (qname, suffix or source IP) should match
191 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
192 sender
= getattr ( self
, method
)
193 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
194 self
. assertFalse ( receivedQuery
)
195 self
. assertTrue ( receivedResponse
)
196 self
. assertEquals ( expectedResponse
, receivedResponse
)