]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Routing.py
6 from dnsdisttests
import DNSDistTest
, pickAvailablePort
8 class TestRoutingPoolRouting(DNSDistTest
):
10 _config_template
= """
11 newServer{address="127.0.0.1:%s", pool="real"}
12 addAction(SuffixMatchNodeRule("poolaction.routing.tests.powerdns.com"), PoolAction("real"))
13 -- by default PoolAction stops the processing so the second rule should not be executed
14 addAction(SuffixMatchNodeRule("poolaction.routing.tests.powerdns.com"), PoolAction("not-real"))
16 -- this time we configure PoolAction to not stop the processing
17 addAction(SuffixMatchNodeRule("poolaction-nostop.routing.tests.powerdns.com"), PoolAction("no-real", false))
18 -- so the second rule should be executed
19 addAction(SuffixMatchNodeRule("poolaction-nostop.routing.tests.powerdns.com"), PoolAction("real"))
22 def testPolicyPoolAction(self
):
24 Routing: Set pool by qname via PoolAction
26 Send an A query to "poolaction.routing.tests.powerdns.com.",
27 check that dnsdist routes the query to the "real" pool.
29 name
= 'poolaction.routing.tests.powerdns.com.'
30 query
= dns
.message
.make_query(name
, 'A', 'IN')
31 response
= dns
.message
.make_response(query
)
32 rrset
= dns
.rrset
.from_text(name
,
37 response
.answer
.append(rrset
)
39 for method
in ("sendUDPQuery", "sendTCPQuery"):
40 sender
= getattr(self
, method
)
41 (receivedQuery
, receivedResponse
) = sender(query
, response
)
42 receivedQuery
.id = query
.id
43 self
.assertEqual(query
, receivedQuery
)
44 self
.assertEqual(response
, receivedResponse
)
46 def testPolicyPoolActionNoStop(self
):
48 Routing: Set pool by qname via PoolAction (no stop)
50 name
= 'poolaction-nostop.routing.tests.powerdns.com.'
51 query
= dns
.message
.make_query(name
, 'A', 'IN')
52 response
= dns
.message
.make_response(query
)
53 rrset
= dns
.rrset
.from_text(name
,
58 response
.answer
.append(rrset
)
60 for method
in ("sendUDPQuery", "sendTCPQuery"):
61 sender
= getattr(self
, method
)
62 (receivedQuery
, receivedResponse
) = sender(query
, response
)
63 receivedQuery
.id = query
.id
64 self
.assertEqual(query
, receivedQuery
)
65 self
.assertEqual(response
, receivedResponse
)
67 def testDefaultPool(self
):
69 Routing: Set pool by qname canary
71 Send an A query to "notpool.routing.tests.powerdns.com.",
72 check that dnsdist sends no response (no servers
75 name
= 'notpool.routing.tests.powerdns.com.'
76 query
= dns
.message
.make_query(name
, 'A', 'IN')
78 for method
in ("sendUDPQuery", "sendTCPQuery"):
79 sender
= getattr(self
, method
)
80 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
81 self
.assertEqual(receivedResponse
, None)
83 class TestRoutingQPSPoolRouting(DNSDistTest
):
84 _config_template
= """
85 newServer{address="127.0.0.1:%s", pool="regular"}
86 addAction(SuffixMatchNodeRule("qpspoolaction.routing.tests.powerdns.com"), QPSPoolAction(10, "regular"))
89 def testQPSPoolAction(self
):
91 Routing: Set pool by QPS via action
93 Send queries to "qpspoolaction.routing.tests.powerdns.com."
94 check that dnsdist does not route the query to the "regular" pool
95 when the max QPS has been reached.
98 name
= 'qpspoolaction.routing.tests.powerdns.com.'
99 query
= dns
.message
.make_query(name
, 'A', 'IN')
100 response
= dns
.message
.make_response(query
)
101 rrset
= dns
.rrset
.from_text(name
,
106 response
.answer
.append(rrset
)
108 for _
in range(maxQPS
):
109 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
110 receivedQuery
.id = query
.id
111 self
.assertEqual(query
, receivedQuery
)
112 self
.assertEqual(response
, receivedResponse
)
114 # we should now be sent to the "abuse" pool which is empty,
115 # so the queries should be dropped
116 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
117 self
.assertEqual(receivedResponse
, None)
121 # again, over TCP this time
122 for _
in range(maxQPS
):
123 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
124 receivedQuery
.id = query
.id
125 self
.assertEqual(query
, receivedQuery
)
126 self
.assertEqual(response
, receivedResponse
)
129 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
130 self
.assertEqual(receivedResponse
, None)
132 class RoundRobinTest(object):
133 def doTestRR(self
, name
):
137 Send 10 A queries to the requested name,
138 check that dnsdist routes half of it to each backend.
142 query
= dns
.message
.make_query(name
, 'A', 'IN')
143 response
= dns
.message
.make_response(query
)
144 rrset
= dns
.rrset
.from_text(name
,
149 response
.answer
.append(rrset
)
151 # the round robin counter is shared for UDP and TCP,
152 # so we need to do UDP then TCP to have a clean count
153 for _
in range(numberOfQueries
):
154 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
155 receivedQuery
.id = query
.id
156 self
.assertEqual(query
, receivedQuery
)
157 self
.assertEqual(response
, receivedResponse
)
159 for _
in range(numberOfQueries
):
160 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
161 receivedQuery
.id = query
.id
162 self
.assertEqual(query
, receivedQuery
)
163 self
.assertEqual(response
, receivedResponse
)
165 for key
in self
._responsesCounter
:
166 value
= self
._responsesCounter
[key
]
167 self
.assertEqual(value
, numberOfQueries
/ 2)
169 class TestRoutingRoundRobinLB(RoundRobinTest
, DNSDistTest
):
171 _testServer2Port
= pickAvailablePort()
172 _config_params
= ['_testServerPort', '_testServer2Port']
173 _config_template
= """
174 setServerPolicy(roundrobin)
175 s1 = newServer{address="127.0.0.1:%s"}
177 s2 = newServer{address="127.0.0.1:%s"}
182 def startResponders(cls
):
183 print("Launching responders..")
184 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
185 cls
._UDPResponder
.daemon
= True
186 cls
._UDPResponder
.start()
187 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
188 cls
._UDPResponder
2.daemon
= True
189 cls
._UDPResponder
2.start()
191 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
192 cls
._TCPResponder
.daemon
= True
193 cls
._TCPResponder
.start()
195 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
196 cls
._TCPResponder
2.daemon
= True
197 cls
._TCPResponder
2.start()
203 Send 10 A queries to "rr.routing.tests.powerdns.com.",
204 check that dnsdist routes half of it to each backend.
206 self
.doTestRR('rr.routing.tests.powerdns.com.')
208 class TestRoutingRoundRobinLBViaPool(RoundRobinTest
, DNSDistTest
):
210 _testServer2Port
= pickAvailablePort()
211 _config_params
= ['_testServerPort', '_testServer2Port']
212 _config_template
= """
213 s1 = newServer{address="127.0.0.1:%d"}
215 s2 = newServer{address="127.0.0.1:%d"}
217 setPoolServerPolicy(roundrobin, '')
221 def startResponders(cls
):
222 print("Launching responders..")
223 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
224 cls
._UDPResponder
.daemon
= True
225 cls
._UDPResponder
.start()
226 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
227 cls
._UDPResponder
2.daemon
= True
228 cls
._UDPResponder
2.start()
230 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
231 cls
._TCPResponder
.daemon
= True
232 cls
._TCPResponder
.start()
234 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
235 cls
._TCPResponder
2.daemon
= True
236 cls
._TCPResponder
2.start()
240 Routing: Round Robin (pool)
242 Send 10 A queries to "rr-pool.routing.tests.powerdns.com.",
243 check that dnsdist routes half of it to each backend.
245 self
.doTestRR('rr-pool.routing.tests.powerdns.com.')
247 class TestRoutingRoundRobinLBOneDown(DNSDistTest
):
249 _testServer2Port
= pickAvailablePort()
250 _config_params
= ['_testServerPort', '_testServer2Port']
251 _config_template
= """
252 setServerPolicy(roundrobin)
253 s1 = newServer{address="127.0.0.1:%s"}
255 s2 = newServer{address="127.0.0.1:%s"}
259 def testRRWithOneDown(self
):
261 Routing: Round Robin with one server down
263 Send 100 A queries to "rr.routing.tests.powerdns.com.",
264 check that dnsdist routes all of it to the only backend up.
267 name
= 'rr.routing.tests.powerdns.com.'
268 query
= dns
.message
.make_query(name
, 'A', 'IN')
269 response
= dns
.message
.make_response(query
)
270 rrset
= dns
.rrset
.from_text(name
,
275 response
.answer
.append(rrset
)
277 # the round robin counter is shared for UDP and TCP,
278 # so we need to do UDP then TCP to have a clean count
279 for _
in range(numberOfQueries
):
280 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
281 receivedQuery
.id = query
.id
282 self
.assertEqual(query
, receivedQuery
)
283 self
.assertEqual(response
, receivedResponse
)
285 for _
in range(numberOfQueries
):
286 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
287 receivedQuery
.id = query
.id
288 self
.assertEqual(query
, receivedQuery
)
289 self
.assertEqual(response
, receivedResponse
)
292 for key
in self
._responsesCounter
:
293 value
= self
._responsesCounter
[key
]
294 self
.assertTrue(value
== numberOfQueries
or value
== 0)
297 self
.assertEqual(total
, numberOfQueries
* 2)
299 class TestRoutingRoundRobinLBAllDown(DNSDistTest
):
301 _testServer2Port
= pickAvailablePort()
302 _config_params
= ['_testServerPort', '_testServer2Port']
303 _config_template
= """
304 setServerPolicy(roundrobin)
305 setRoundRobinFailOnNoServer(true)
306 s1 = newServer{address="127.0.0.1:%s"}
308 s2 = newServer{address="127.0.0.1:%s"}
312 def testRRWithAllDown(self
):
314 Routing: Round Robin with all servers down
317 name
= 'alldown.rr.routing.tests.powerdns.com.'
318 query
= dns
.message
.make_query(name
, 'A', 'IN')
319 response
= dns
.message
.make_response(query
)
320 rrset
= dns
.rrset
.from_text(name
,
325 response
.answer
.append(rrset
)
327 for method
in ("sendUDPQuery", "sendTCPQuery"):
328 sender
= getattr(self
, method
)
329 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
330 self
.assertEqual(receivedResponse
, None)
332 class TestRoutingLuaFFIPerThreadRoundRobinLB(RoundRobinTest
, DNSDistTest
):
334 _testServer2Port
= pickAvailablePort()
335 _config_params
= ['_testServerPort', '_testServer2Port']
336 _config_template
= """
337 -- otherwise we start too many TCP workers, and as each thread
338 -- uses it own counter this makes the TCP queries distribution hard to predict
339 setMaxTCPClientThreads(1)
340 setServerPolicyLuaFFIPerThread("luaffiroundrobin", [[
341 local ffi = require("ffi")
345 return function(servers_list, dq)
346 counter = counter + 1
347 return (counter %% tonumber(C.dnsdist_ffi_servers_list_get_count(servers_list)))
351 s1 = newServer{address="127.0.0.1:%s"}
353 s2 = newServer{address="127.0.0.1:%s"}
358 def startResponders(cls
):
359 print("Launching responders..")
360 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
361 cls
._UDPResponder
.daemon
= True
362 cls
._UDPResponder
.start()
363 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
364 cls
._UDPResponder
2.daemon
= True
365 cls
._UDPResponder
2.start()
367 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
368 cls
._TCPResponder
.daemon
= True
369 cls
._TCPResponder
.start()
371 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
372 cls
._TCPResponder
2.daemon
= True
373 cls
._TCPResponder
2.start()
377 Routing: Round Robin (LuaFFI)
379 self
.doTestRR('rr-luaffi.routing.tests.powerdns.com.')
381 class TestRoutingCustomLuaRoundRobinLB(RoundRobinTest
, DNSDistTest
):
383 _testServer2Port
= pickAvailablePort()
384 _config_params
= ['_testServerPort', '_testServer2Port']
385 _config_template
= """
386 -- otherwise we start too many TCP workers, and as each thread
387 -- uses it own counter this makes the TCP queries distribution hard to predict
388 setMaxTCPClientThreads(1)
391 function luaroundrobin(servers_list, dq)
392 counter = counter + 1
393 return servers_list[(counter %% #servers_list)+1]
395 setServerPolicy(newServerPolicy("custom lua round robin policy", luaroundrobin))
397 s1 = newServer{address="127.0.0.1:%s"}
399 s2 = newServer{address="127.0.0.1:%s"}
404 def startResponders(cls
):
405 print("Launching responders..")
406 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
407 cls
._UDPResponder
.daemon
= True
408 cls
._UDPResponder
.start()
409 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
410 cls
._UDPResponder
2.daemon
= True
411 cls
._UDPResponder
2.start()
413 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
414 cls
._TCPResponder
.daemon
= True
415 cls
._TCPResponder
.start()
417 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
418 cls
._TCPResponder
2.daemon
= True
419 cls
._TCPResponder
2.start()
423 Routing: Round Robin (Lua)
425 self
.doTestRR('rr-lua.routing.tests.powerdns.com.')
427 class TestRoutingOrder(DNSDistTest
):
429 _testServer2Port
= pickAvailablePort()
430 _config_params
= ['_testServerPort', '_testServer2Port']
431 _config_template
= """
432 setServerPolicy(firstAvailable)
433 s1 = newServer{address="127.0.0.1:%s", order=2}
435 s2 = newServer{address="127.0.0.1:%s", order=1}
440 def startResponders(cls
):
441 print("Launching responders..")
442 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
443 cls
._UDPResponder
.daemon
= True
444 cls
._UDPResponder
.start()
445 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
446 cls
._UDPResponder
2.daemon
= True
447 cls
._UDPResponder
2.start()
449 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
450 cls
._TCPResponder
.daemon
= True
451 cls
._TCPResponder
.start()
453 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
454 cls
._TCPResponder
2.daemon
= True
455 cls
._TCPResponder
2.start()
459 Routing: firstAvailable policy based on 'order'
461 Send 50 A queries to "order.routing.tests.powerdns.com.",
462 check that dnsdist routes all of it to the second backend
463 because it has the lower order value.
466 name
= 'order.routing.tests.powerdns.com.'
467 query
= dns
.message
.make_query(name
, 'A', 'IN')
468 response
= dns
.message
.make_response(query
)
469 rrset
= dns
.rrset
.from_text(name
,
474 response
.answer
.append(rrset
)
476 for _
in range(numberOfQueries
):
477 for method
in ("sendUDPQuery", "sendTCPQuery"):
478 sender
= getattr(self
, method
)
479 (receivedQuery
, receivedResponse
) = sender(query
, response
)
480 receivedQuery
.id = query
.id
481 self
.assertEqual(query
, receivedQuery
)
482 self
.assertEqual(response
, receivedResponse
)
485 if 'UDP Responder' in self
._responsesCounter
:
486 self
.assertEqual(self
._responsesCounter
['UDP Responder'], 0)
487 self
.assertEqual(self
._responsesCounter
['UDP Responder 2'], numberOfQueries
)
488 if 'TCP Responder' in self
._responsesCounter
:
489 self
.assertEqual(self
._responsesCounter
['TCP Responder'], 0)
490 self
.assertEqual(self
._responsesCounter
['TCP Responder 2'], numberOfQueries
)
492 class TestFirstAvailableQPSPacketCacheHits(DNSDistTest
):
495 _testServer2Port
= pickAvailablePort()
496 _config_params
= ['_testServerPort', '_testServer2Port']
497 _config_template
= """
498 setServerPolicy(firstAvailable)
499 s1 = newServer{address="127.0.0.1:%s", order=2}
501 s2 = newServer{address="127.0.0.1:%s", order=1, qps=10}
503 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
504 getPool(""):setCache(pc)
508 def startResponders(cls
):
509 print("Launching responders..")
510 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
511 cls
._UDPResponder
.daemon
= True
512 cls
._UDPResponder
.start()
513 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
514 cls
._UDPResponder
2.daemon
= True
515 cls
._UDPResponder
2.start()
517 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
518 cls
._TCPResponder
.daemon
= True
519 cls
._TCPResponder
.start()
521 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
522 cls
._TCPResponder
2.daemon
= True
523 cls
._TCPResponder
2.start()
525 def testOrderQPSCacheHits(self
):
527 Routing: firstAvailable policy with QPS limit and packet cache
529 Send 50 A queries for "order-qps-cache.routing.tests.powerdns.com.",
530 then 10 A queries for "order-qps-cache-2.routing.tests.powerdns.com." (uncached)
531 check that dnsdist routes all of the (uncached) queries to the second backend, because it has the lower order value,
532 and the QPS should only be counted for cache misses.
535 name
= 'order-qps-cache.routing.tests.powerdns.com.'
536 query
= dns
.message
.make_query(name
, 'A', 'IN')
537 response
= dns
.message
.make_response(query
)
538 rrset
= dns
.rrset
.from_text(name
,
543 response
.answer
.append(rrset
)
545 # first queries to fill the cache
546 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
547 self
.assertTrue(receivedQuery
)
548 self
.assertTrue(receivedResponse
)
549 receivedQuery
.id = query
.id
550 self
.assertEqual(query
, receivedQuery
)
551 self
.assertEqual(receivedResponse
, response
)
552 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
553 self
.assertTrue(receivedQuery
)
554 self
.assertTrue(receivedResponse
)
555 receivedQuery
.id = query
.id
556 self
.assertEqual(query
, receivedQuery
)
557 self
.assertEqual(receivedResponse
, response
)
559 for _
in range(numberOfQueries
):
560 for method
in ("sendUDPQuery", "sendTCPQuery"):
561 sender
= getattr(self
, method
)
562 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
563 self
.assertEqual(receivedResponse
, response
)
566 name
= 'order-qps-cache-2.routing.tests.powerdns.com.'
567 query
= dns
.message
.make_query(name
, 'A', 'IN')
568 response
= dns
.message
.make_response(query
)
569 rrset
= dns
.rrset
.from_text(name
,
574 response
.answer
.append(rrset
)
576 # first queries to fill the cache
577 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
578 self
.assertTrue(receivedQuery
)
579 self
.assertTrue(receivedResponse
)
580 receivedQuery
.id = query
.id
581 self
.assertEqual(query
, receivedQuery
)
582 self
.assertEqual(receivedResponse
, response
)
583 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
584 self
.assertTrue(receivedQuery
)
585 self
.assertTrue(receivedResponse
)
586 receivedQuery
.id = query
.id
587 self
.assertEqual(query
, receivedQuery
)
588 self
.assertEqual(receivedResponse
, response
)
590 for _
in range(numberOfQueries
):
591 for method
in ("sendUDPQuery", "sendTCPQuery"):
592 sender
= getattr(self
, method
)
593 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
594 self
.assertEqual(receivedResponse
, response
)
596 # 4 queries should made it through, 2 UDP and 2 TCP
597 for k
,v
in self
._responsesCounter
.items():
601 if 'UDP Responder' in self
._responsesCounter
:
602 self
.assertEqual(self
._responsesCounter
['UDP Responder'], 0)
603 self
.assertEqual(self
._responsesCounter
['UDP Responder 2'], 2)
604 if 'TCP Responder' in self
._responsesCounter
:
605 self
.assertEqual(self
._responsesCounter
['TCP Responder'], 0)
606 self
.assertEqual(self
._responsesCounter
['TCP Responder 2'], 2)
608 class TestRoutingNoServer(DNSDistTest
):
610 _config_template
= """
611 newServer{address="127.0.0.1:%s", pool="real"}
612 setServFailWhenNoServer(true)
615 def testPolicyPoolNoServer(self
):
617 Routing: No server should return ServFail
620 name
= 'noserver.routing.tests.powerdns.com.'
621 query
= dns
.message
.make_query(name
, 'A', 'IN')
622 expectedResponse
= dns
.message
.make_response(query
)
623 expectedResponse
.set_rcode(dns
.rcode
.SERVFAIL
)
625 for method
in ("sendUDPQuery", "sendTCPQuery"):
626 sender
= getattr(self
, method
)
627 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
628 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
631 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
632 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1232)
633 expectedResponse
.set_rcode(dns
.rcode
.SERVFAIL
)
635 for method
in ("sendUDPQuery", "sendTCPQuery"):
636 sender
= getattr(self
, method
)
637 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
638 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
639 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
640 self
.assertEqual(receivedResponse
.payload
, 1232)
642 class TestRoutingWRandom(DNSDistTest
):
644 _testServer2Port
= pickAvailablePort()
645 _config_params
= ['_testServerPort', '_testServer2Port']
646 _config_template
= """
647 setServerPolicy(wrandom)
648 s1 = newServer{address="127.0.0.1:%s", weight=1}
650 s2 = newServer{address="127.0.0.1:%s", weight=2}
655 def startResponders(cls
):
656 print("Launching responders..")
657 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
658 cls
._UDPResponder
.daemon
= True
659 cls
._UDPResponder
.start()
660 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
661 cls
._UDPResponder
2.daemon
= True
662 cls
._UDPResponder
2.start()
664 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
665 cls
._TCPResponder
.daemon
= True
666 cls
._TCPResponder
.start()
668 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
669 cls
._TCPResponder
2.daemon
= True
670 cls
._TCPResponder
2.start()
672 def testWRandom(self
):
676 Send 100 A queries to "wrandom.routing.tests.powerdns.com.",
677 check that dnsdist routes less than half to one, more to the other.
679 numberOfQueries
= 100
680 name
= 'wrandom.routing.tests.powerdns.com.'
681 query
= dns
.message
.make_query(name
, 'A', 'IN')
682 response
= dns
.message
.make_response(query
)
683 rrset
= dns
.rrset
.from_text(name
,
688 response
.answer
.append(rrset
)
690 # the counter is shared for UDP and TCP,
691 # so we need to do UDP then TCP to have a clean count
692 for _
in range(numberOfQueries
):
693 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
694 receivedQuery
.id = query
.id
695 self
.assertEqual(query
, receivedQuery
)
696 self
.assertEqual(response
, receivedResponse
)
698 for _
in range(numberOfQueries
):
699 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
700 receivedQuery
.id = query
.id
701 self
.assertEqual(query
, receivedQuery
)
702 self
.assertEqual(response
, receivedResponse
)
704 # The lower weight downstream should receive less than half the queries
705 self
.assertLess(self
._responsesCounter
['UDP Responder'], numberOfQueries
* 0.50)
706 self
.assertLess(self
._responsesCounter
['TCP Responder'], numberOfQueries
* 0.50)
708 # The higher weight downstream should receive more than half the queries
709 self
.assertGreater(self
._responsesCounter
['UDP Responder 2'], numberOfQueries
* 0.50)
710 self
.assertGreater(self
._responsesCounter
['TCP Responder 2'], numberOfQueries
* 0.50)
713 class TestRoutingHighValueWRandom(DNSDistTest
):
715 _testServer2Port
= pickAvailablePort()
716 _consoleKey
= DNSDistTest
.generateConsoleKey()
717 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
718 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort', '_testServer2Port']
719 _config_template
= """
721 controlSocket("127.0.0.1:%s")
722 setServerPolicy(wrandom)
723 s1 = newServer{address="127.0.0.1:%s", weight=2000000000}
725 s2 = newServer{address="127.0.0.1:%s", weight=2000000000}
730 def startResponders(cls
):
731 print("Launching responders..")
732 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=cls
.UDPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
733 cls
._UDPResponder
.daemon
= True
734 cls
._UDPResponder
.start()
735 cls
._UDPResponder
2 = threading
.Thread(name
='UDP Responder 2', target
=cls
.UDPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
736 cls
._UDPResponder
2.daemon
= True
737 cls
._UDPResponder
2.start()
739 cls
._TCPResponder
= threading
.Thread(name
='TCP Responder', target
=cls
.TCPResponder
, args
=[cls
._testServerPort
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
740 cls
._TCPResponder
.daemon
= True
741 cls
._TCPResponder
.start()
743 cls
._TCPResponder
2 = threading
.Thread(name
='TCP Responder 2', target
=cls
.TCPResponder
, args
=[cls
._testServer
2Port
, cls
._toResponderQueue
, cls
._fromResponderQueue
])
744 cls
._TCPResponder
2.daemon
= True
745 cls
._TCPResponder
2.start()
747 def testHighValueWRandom(self
):
749 Routing: WRandom (overflow)
751 Send 100 A queries to "wrandom-overflow.routing.tests.powerdns.com.",
752 check that dnsdist routes to each downstream, rather than failing with
755 numberOfQueries
= 100
756 name
= 'wrandom-overflow.routing.tests.powerdns.com.'
757 query
= dns
.message
.make_query(name
, 'A', 'IN')
758 response
= dns
.message
.make_response(query
)
759 rrset
= dns
.rrset
.from_text(name
,
764 response
.answer
.append(rrset
)
766 # the counter is shared for UDP and TCP,
767 # so we need to do UDP then TCP to have a clean count
768 for _
in range(numberOfQueries
):
769 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
770 receivedQuery
.id = query
.id
771 self
.assertEqual(query
, receivedQuery
)
772 self
.assertEqual(response
, receivedResponse
)
774 for _
in range(numberOfQueries
):
775 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
776 receivedQuery
.id = query
.id
777 self
.assertEqual(query
, receivedQuery
)
778 self
.assertEqual(response
, receivedResponse
)
780 stats
= self
.sendConsoleCommand("dumpStats()").split()
783 # Map to a dict with every other element being the value to the previous one
784 for i
, x
in enumerate(stats
):
786 stats_dict
[x
] = stats
[i
+1]
788 # There should be no queries getting "no-policy" responses
789 self
.assertEqual(stats_dict
['no-policy'], '0')
791 # Each downstream should receive some queries, but it will be unbalanced
792 # because the sum of the weights is higher than INT_MAX.
793 # The first downstream will receive more than half the queries
794 self
.assertGreater(self
._responsesCounter
['UDP Responder'], numberOfQueries
/ 2)
795 self
.assertGreater(self
._responsesCounter
['TCP Responder'], numberOfQueries
/ 2)
797 # The second downstream will receive the remainder of the queries, but it might very well be 0
798 if 'UDP Responder 2' in self
._responsesCounter
:
799 self
.assertEqual(self
._responsesCounter
['UDP Responder 2'], numberOfQueries
- self
._responsesCounter
['UDP Responder'])
800 if 'TCP Responder 2' in self
._responsesCounter
:
801 self
.assertEqual(self
._responsesCounter
['TCP Responder 2'], numberOfQueries
- self
._responsesCounter
['TCP Responder'])