]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_CDB.py
7 from dnsdisttests
import DNSDistTest
9 class CDBTest ( DNSDistTest
):
11 _cdbFileName
= '/tmp/test-cdb-db'
13 _config_template
= """
14 newServer{address="127.0.0.1: %d "}
16 kvs = newCDBKVStore(' %s ', %d )
19 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
20 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
22 -- does a lookup in the CDB database using the qname in wire format as key, and store the result into the 'kvs-qname-result' tag
23 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyQName(), 'kvs-qname-result'))
25 -- if the value of the 'kvs-qname-result' tag is set to 'this is the value of the qname tag'
26 -- does a lookup in the CDB database using the value of the 'kvs-qname-result' tag as key, and store the result into the 'kvs-tag-result' tag
27 addAction(TagRule('kvs-qname-result', 'this is the value of the qname tag'), KeyValueStoreLookupAction(kvs, KeyValueLookupKeyTag('kvs-qname-result'), 'kvs-tag-result'))
29 -- does a lookup in the CDB database using the source IP as key, and store the result into the 'kvs-sourceip-result' tag
30 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySourceIP(), 'kvs-sourceip-result'))
32 -- does a lookup in the CDB database using the qname in wire format as key, but this time does a suffix lookup, and store the result into the 'kvs-suffix-result' tag
33 addAction(AllRule(), KeyValueStoreLookupAction(kvs, KeyValueLookupKeySuffix(), 'kvs-suffix-result'))
35 -- Now we take action based on the result of the lookups
36 -- if the value of the 'kvs-tag-result' is set to 'this is the value of the second tag', spoof a response
37 addAction(TagRule('kvs-tag-result', 'this is the value of the second tag'), SpoofAction('1.2.3.4'))
39 -- if the value of the 'kvs-suffix-result' is set to 'this is the value of the suffix tag', spoof a response
40 addAction(TagRule('kvs-suffix-result', 'this is the value of the suffix tag'), SpoofAction('42.42.42.42'))
42 -- if the value of the 'kvs-sourceip-result' is set to 'this is the value of the source address tag', spoof a response
43 addAction(TagRule('kvs-sourceip-result', 'this is the value of the source address tag'), SpoofAction('5.6.7.8'))
45 -- otherwise, spoof a different response
46 addAction(AllRule(), SpoofAction('9.9.9.9'))
48 _config_params
= [ '_testServerPort' , '_cdbFileName' , '_cdbRefreshDelay' ]
50 class TestCDBSimple ( CDBTest
):
54 shutil
. copyfile ( 'kvs.cdb.1' , cls
._ cdbFileName
)
64 print ( "Launching tests.." )
66 def testCDBSource ( self
):
68 CDB: Match on source address
70 name
= 'source-ip.cdb.tests.powerdns.com.'
71 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
72 # dnsdist set RA = RD for spoofed responses
73 query
. flags
&= ~dns
. flags
. RD
74 expectedResponse
= dns
. message
. make_response ( query
)
75 rrset
= dns
. rrset
. from_text ( name
,
80 expectedResponse
. answer
. append ( rrset
)
82 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
83 sender
= getattr ( self
, method
)
84 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
85 self
. assertFalse ( receivedQuery
)
86 self
. assertTrue ( receivedResponse
)
87 self
. assertEquals ( expectedResponse
, receivedResponse
)
89 def testCDBQNamePlusTagLookup ( self
):
91 CDB: Match on qname then does a second lookup using the value of the first lookup
93 name
= 'qname.cdb.tests.powerdns.com.'
94 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
95 # dnsdist set RA = RD for spoofed responses
96 query
. flags
&= ~dns
. flags
. RD
97 expectedResponse
= dns
. message
. make_response ( query
)
98 rrset
= dns
. rrset
. from_text ( name
,
103 expectedResponse
. answer
. append ( rrset
)
105 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
106 sender
= getattr ( self
, method
)
107 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
108 self
. assertFalse ( receivedQuery
)
109 self
. assertTrue ( receivedResponse
)
110 self
. assertEquals ( expectedResponse
, receivedResponse
)
112 def testCDBSuffixLookup ( self
):
114 CDB: Match on the qname via a suffix lookup
116 name
= 'sub.sub.suffix.cdb.tests.powerdns.com.'
117 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
118 # dnsdist set RA = RD for spoofed responses
119 query
. flags
&= ~dns
. flags
. RD
120 expectedResponse
= dns
. message
. make_response ( query
)
121 rrset
= dns
. rrset
. from_text ( name
,
126 expectedResponse
. answer
. append ( rrset
)
128 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
129 sender
= getattr ( self
, method
)
130 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
131 self
. assertFalse ( receivedQuery
)
132 self
. assertTrue ( receivedResponse
)
133 self
. assertEquals ( expectedResponse
, receivedResponse
)
135 class TestCDBReload ( CDBTest
):
139 shutil
. copyfile ( 'kvs.cdb.1' , cls
._ cdbFileName
)
145 cls
. startResponders ()
149 print ( "Launching tests.." )
151 def testCDBReload ( self
):
153 CDB: Test that the CDB is correctly reloaded
155 name
= 'reload.cdb.tests.powerdns.com.'
156 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
157 # dnsdist set RA = RD for spoofed responses
158 query
. flags
&= ~dns
. flags
. RD
159 expectedResponse
= dns
. message
. make_response ( query
)
160 rrset
= dns
. rrset
. from_text ( name
,
165 expectedResponse
. answer
. append ( rrset
)
167 # only the source address should match
168 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
169 sender
= getattr ( self
, method
)
170 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
171 self
. assertFalse ( receivedQuery
)
172 self
. assertTrue ( receivedResponse
)
173 self
. assertEquals ( expectedResponse
, receivedResponse
)
175 # switch to the second DB which has no entry for 127.0.0.1
176 shutil
. copyfile ( 'kvs.cdb.2' , self
._ cdbFileName
)
177 # wait long enough for the CDB database to be reloaded
178 time
. sleep ( self
._ cdbRefreshDelay
+ 1 )
180 expectedResponse
= dns
. message
. make_response ( query
)
181 rrset
= dns
. rrset
. from_text ( name
,
186 expectedResponse
. answer
. append ( rrset
)
188 # nothing (qname, suffix or source IP) should match
189 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
190 sender
= getattr ( self
, method
)
191 ( receivedQuery
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
192 self
. assertFalse ( receivedQuery
)
193 self
. assertTrue ( receivedResponse
)
194 self
. assertEquals ( expectedResponse
, receivedResponse
)