]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Routing.py
Merge pull request #13761 from rgacogne/ddist-test-ebpf
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Routing.py
1 #!/usr/bin/env python
2 import base64
3 import threading
4 import time
5 import dns
6 from dnsdisttests import DNSDistTest, pickAvailablePort
7
8 class TestRoutingPoolRouting(DNSDistTest):
9
10 _config_template = """
11 newServer{address="127.0.0.1:%s", pool="real"}
12 addAction(SuffixMatchNodeRule("poolaction.routing.tests.powerdns.com"), PoolAction("real"))
13 -- by default PoolAction stops the processing so the second rule should not be executed
14 addAction(SuffixMatchNodeRule("poolaction.routing.tests.powerdns.com"), PoolAction("not-real"))
15
16 -- this time we configure PoolAction to not stop the processing
17 addAction(SuffixMatchNodeRule("poolaction-nostop.routing.tests.powerdns.com"), PoolAction("no-real", false))
18 -- so the second rule should be executed
19 addAction(SuffixMatchNodeRule("poolaction-nostop.routing.tests.powerdns.com"), PoolAction("real"))
20 """
21
22 def testPolicyPoolAction(self):
23 """
24 Routing: Set pool by qname via PoolAction
25
26 Send an A query to "poolaction.routing.tests.powerdns.com.",
27 check that dnsdist routes the query to the "real" pool.
28 """
29 name = 'poolaction.routing.tests.powerdns.com.'
30 query = dns.message.make_query(name, 'A', 'IN')
31 response = dns.message.make_response(query)
32 rrset = dns.rrset.from_text(name,
33 60,
34 dns.rdataclass.IN,
35 dns.rdatatype.A,
36 '192.0.2.1')
37 response.answer.append(rrset)
38
39 for method in ("sendUDPQuery", "sendTCPQuery"):
40 sender = getattr(self, method)
41 (receivedQuery, receivedResponse) = sender(query, response)
42 receivedQuery.id = query.id
43 self.assertEqual(query, receivedQuery)
44 self.assertEqual(response, receivedResponse)
45
46 def testPolicyPoolActionNoStop(self):
47 """
48 Routing: Set pool by qname via PoolAction (no stop)
49 """
50 name = 'poolaction-nostop.routing.tests.powerdns.com.'
51 query = dns.message.make_query(name, 'A', 'IN')
52 response = dns.message.make_response(query)
53 rrset = dns.rrset.from_text(name,
54 60,
55 dns.rdataclass.IN,
56 dns.rdatatype.A,
57 '192.0.2.1')
58 response.answer.append(rrset)
59
60 for method in ("sendUDPQuery", "sendTCPQuery"):
61 sender = getattr(self, method)
62 (receivedQuery, receivedResponse) = sender(query, response)
63 receivedQuery.id = query.id
64 self.assertEqual(query, receivedQuery)
65 self.assertEqual(response, receivedResponse)
66
67 def testDefaultPool(self):
68 """
69 Routing: Set pool by qname canary
70
71 Send an A query to "notpool.routing.tests.powerdns.com.",
72 check that dnsdist sends no response (no servers
73 in the default pool).
74 """
75 name = 'notpool.routing.tests.powerdns.com.'
76 query = dns.message.make_query(name, 'A', 'IN')
77
78 for method in ("sendUDPQuery", "sendTCPQuery"):
79 sender = getattr(self, method)
80 (_, receivedResponse) = sender(query, response=None, useQueue=False)
81 self.assertEqual(receivedResponse, None)
82
83 class TestRoutingQPSPoolRouting(DNSDistTest):
84 _config_template = """
85 newServer{address="127.0.0.1:%s", pool="regular"}
86 addAction(SuffixMatchNodeRule("qpspoolaction.routing.tests.powerdns.com"), QPSPoolAction(10, "regular"))
87 """
88
89 def testQPSPoolAction(self):
90 """
91 Routing: Set pool by QPS via action
92
93 Send queries to "qpspoolaction.routing.tests.powerdns.com."
94 check that dnsdist does not route the query to the "regular" pool
95 when the max QPS has been reached.
96 """
97 maxQPS = 10
98 name = 'qpspoolaction.routing.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN')
100 response = dns.message.make_response(query)
101 rrset = dns.rrset.from_text(name,
102 60,
103 dns.rdataclass.IN,
104 dns.rdatatype.A,
105 '192.0.2.1')
106 response.answer.append(rrset)
107
108 for _ in range(maxQPS):
109 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
110 receivedQuery.id = query.id
111 self.assertEqual(query, receivedQuery)
112 self.assertEqual(response, receivedResponse)
113
114 # we should now be sent to the "abuse" pool which is empty,
115 # so the queries should be dropped
116 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
117 self.assertEqual(receivedResponse, None)
118
119 time.sleep(1)
120
121 # again, over TCP this time
122 for _ in range(maxQPS):
123 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
124 receivedQuery.id = query.id
125 self.assertEqual(query, receivedQuery)
126 self.assertEqual(response, receivedResponse)
127
128
129 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
130 self.assertEqual(receivedResponse, None)
131
132
133 class TestRoutingRoundRobinLB(DNSDistTest):
134
135 _testServer2Port = pickAvailablePort()
136 _config_params = ['_testServerPort', '_testServer2Port']
137 _config_template = """
138 setServerPolicy(roundrobin)
139 s1 = newServer{address="127.0.0.1:%s"}
140 s1:setUp()
141 s2 = newServer{address="127.0.0.1:%s"}
142 s2:setUp()
143 """
144
145 @classmethod
146 def startResponders(cls):
147 print("Launching responders..")
148 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
149 cls._UDPResponder.daemon = True
150 cls._UDPResponder.start()
151 cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
152 cls._UDPResponder2.daemon = True
153 cls._UDPResponder2.start()
154
155 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
156 cls._TCPResponder.daemon = True
157 cls._TCPResponder.start()
158
159 cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
160 cls._TCPResponder2.daemon = True
161 cls._TCPResponder2.start()
162
163 def testRR(self):
164 """
165 Routing: Round Robin
166
167 Send 10 A queries to "rr.routing.tests.powerdns.com.",
168 check that dnsdist routes half of it to each backend.
169 """
170 numberOfQueries = 10
171 name = 'rr.routing.tests.powerdns.com.'
172 query = dns.message.make_query(name, 'A', 'IN')
173 response = dns.message.make_response(query)
174 rrset = dns.rrset.from_text(name,
175 60,
176 dns.rdataclass.IN,
177 dns.rdatatype.A,
178 '192.0.2.1')
179 response.answer.append(rrset)
180
181 # the round robin counter is shared for UDP and TCP,
182 # so we need to do UDP then TCP to have a clean count
183 for _ in range(numberOfQueries):
184 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
185 receivedQuery.id = query.id
186 self.assertEqual(query, receivedQuery)
187 self.assertEqual(response, receivedResponse)
188
189 for _ in range(numberOfQueries):
190 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
191 receivedQuery.id = query.id
192 self.assertEqual(query, receivedQuery)
193 self.assertEqual(response, receivedResponse)
194
195 for key in self._responsesCounter:
196 value = self._responsesCounter[key]
197 self.assertEqual(value, numberOfQueries / 2)
198
199 class TestRoutingRoundRobinLBOneDown(DNSDistTest):
200
201 _testServer2Port = pickAvailablePort()
202 _config_params = ['_testServerPort', '_testServer2Port']
203 _config_template = """
204 setServerPolicy(roundrobin)
205 s1 = newServer{address="127.0.0.1:%s"}
206 s1:setUp()
207 s2 = newServer{address="127.0.0.1:%s"}
208 s2:setDown()
209 """
210
211 def testRRWithOneDown(self):
212 """
213 Routing: Round Robin with one server down
214
215 Send 100 A queries to "rr.routing.tests.powerdns.com.",
216 check that dnsdist routes all of it to the only backend up.
217 """
218 numberOfQueries = 10
219 name = 'rr.routing.tests.powerdns.com.'
220 query = dns.message.make_query(name, 'A', 'IN')
221 response = dns.message.make_response(query)
222 rrset = dns.rrset.from_text(name,
223 60,
224 dns.rdataclass.IN,
225 dns.rdatatype.A,
226 '192.0.2.1')
227 response.answer.append(rrset)
228
229 # the round robin counter is shared for UDP and TCP,
230 # so we need to do UDP then TCP to have a clean count
231 for _ in range(numberOfQueries):
232 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
233 receivedQuery.id = query.id
234 self.assertEqual(query, receivedQuery)
235 self.assertEqual(response, receivedResponse)
236
237 for _ in range(numberOfQueries):
238 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
239 receivedQuery.id = query.id
240 self.assertEqual(query, receivedQuery)
241 self.assertEqual(response, receivedResponse)
242
243 total = 0
244 for key in self._responsesCounter:
245 value = self._responsesCounter[key]
246 self.assertTrue(value == numberOfQueries or value == 0)
247 total += value
248
249 self.assertEqual(total, numberOfQueries * 2)
250
251 class TestRoutingRoundRobinLBAllDown(DNSDistTest):
252
253 _testServer2Port = pickAvailablePort()
254 _config_params = ['_testServerPort', '_testServer2Port']
255 _config_template = """
256 setServerPolicy(roundrobin)
257 setRoundRobinFailOnNoServer(true)
258 s1 = newServer{address="127.0.0.1:%s"}
259 s1:setDown()
260 s2 = newServer{address="127.0.0.1:%s"}
261 s2:setDown()
262 """
263
264 def testRRWithAllDown(self):
265 """
266 Routing: Round Robin with all servers down
267 """
268 numberOfQueries = 10
269 name = 'alldown.rr.routing.tests.powerdns.com.'
270 query = dns.message.make_query(name, 'A', 'IN')
271 response = dns.message.make_response(query)
272 rrset = dns.rrset.from_text(name,
273 60,
274 dns.rdataclass.IN,
275 dns.rdatatype.A,
276 '192.0.2.1')
277 response.answer.append(rrset)
278
279 for method in ("sendUDPQuery", "sendTCPQuery"):
280 sender = getattr(self, method)
281 (_, receivedResponse) = sender(query, response=None, useQueue=False)
282 self.assertEqual(receivedResponse, None)
283
284 class TestRoutingLuaFFIPerThreadRoundRobinLB(DNSDistTest):
285
286 _testServer2Port = pickAvailablePort()
287 _config_params = ['_testServerPort', '_testServer2Port']
288 _config_template = """
289 -- otherwise we start too many TCP workers, and as each thread
290 -- uses it own counter this makes the TCP queries distribution hard to predict
291 setMaxTCPClientThreads(1)
292 setServerPolicyLuaFFIPerThread("luaffiroundrobin", [[
293 local ffi = require("ffi")
294 local C = ffi.C
295
296 local counter = 0
297 return function(servers_list, dq)
298 counter = counter + 1
299 return (counter %% tonumber(C.dnsdist_ffi_servers_list_get_count(servers_list)))
300 end
301 ]])
302
303 s1 = newServer{address="127.0.0.1:%s"}
304 s1:setUp()
305 s2 = newServer{address="127.0.0.1:%s"}
306 s2:setUp()
307 """
308
309 @classmethod
310 def startResponders(cls):
311 print("Launching responders..")
312 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
313 cls._UDPResponder.daemon = True
314 cls._UDPResponder.start()
315 cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
316 cls._UDPResponder2.daemon = True
317 cls._UDPResponder2.start()
318
319 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
320 cls._TCPResponder.daemon = True
321 cls._TCPResponder.start()
322
323 cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
324 cls._TCPResponder2.daemon = True
325 cls._TCPResponder2.start()
326
327 def testRR(self):
328 """
329 Routing: Round Robin
330
331 Send 10 A queries to "rr.routing.tests.powerdns.com.",
332 check that dnsdist routes half of it to each backend.
333 """
334 numberOfQueries = 10
335 name = 'rr.routing.tests.powerdns.com.'
336 query = dns.message.make_query(name, 'A', 'IN')
337 response = dns.message.make_response(query)
338 rrset = dns.rrset.from_text(name,
339 60,
340 dns.rdataclass.IN,
341 dns.rdatatype.A,
342 '192.0.2.1')
343 response.answer.append(rrset)
344
345 # the round robin counter is shared for UDP and TCP,
346 # so we need to do UDP then TCP to have a clean count
347 for _ in range(numberOfQueries):
348 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
349 receivedQuery.id = query.id
350 self.assertEqual(query, receivedQuery)
351 self.assertEqual(response, receivedResponse)
352
353 for _ in range(numberOfQueries):
354 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
355 receivedQuery.id = query.id
356 self.assertEqual(query, receivedQuery)
357 self.assertEqual(response, receivedResponse)
358
359 for key in self._responsesCounter:
360 value = self._responsesCounter[key]
361 self.assertEqual(value, numberOfQueries / 2)
362
363 class TestRoutingOrder(DNSDistTest):
364
365 _testServer2Port = pickAvailablePort()
366 _config_params = ['_testServerPort', '_testServer2Port']
367 _config_template = """
368 setServerPolicy(firstAvailable)
369 s1 = newServer{address="127.0.0.1:%s", order=2}
370 s1:setUp()
371 s2 = newServer{address="127.0.0.1:%s", order=1}
372 s2:setUp()
373 """
374
375 @classmethod
376 def startResponders(cls):
377 print("Launching responders..")
378 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
379 cls._UDPResponder.daemon = True
380 cls._UDPResponder.start()
381 cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
382 cls._UDPResponder2.daemon = True
383 cls._UDPResponder2.start()
384
385 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
386 cls._TCPResponder.daemon = True
387 cls._TCPResponder.start()
388
389 cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
390 cls._TCPResponder2.daemon = True
391 cls._TCPResponder2.start()
392
393 def testOrder(self):
394 """
395 Routing: firstAvailable policy based on 'order'
396
397 Send 50 A queries to "order.routing.tests.powerdns.com.",
398 check that dnsdist routes all of it to the second backend
399 because it has the lower order value.
400 """
401 numberOfQueries = 50
402 name = 'order.routing.tests.powerdns.com.'
403 query = dns.message.make_query(name, 'A', 'IN')
404 response = dns.message.make_response(query)
405 rrset = dns.rrset.from_text(name,
406 60,
407 dns.rdataclass.IN,
408 dns.rdatatype.A,
409 '192.0.2.1')
410 response.answer.append(rrset)
411
412 for _ in range(numberOfQueries):
413 for method in ("sendUDPQuery", "sendTCPQuery"):
414 sender = getattr(self, method)
415 (receivedQuery, receivedResponse) = sender(query, response)
416 receivedQuery.id = query.id
417 self.assertEqual(query, receivedQuery)
418 self.assertEqual(response, receivedResponse)
419
420 total = 0
421 if 'UDP Responder' in self._responsesCounter:
422 self.assertEqual(self._responsesCounter['UDP Responder'], 0)
423 self.assertEqual(self._responsesCounter['UDP Responder 2'], numberOfQueries)
424 if 'TCP Responder' in self._responsesCounter:
425 self.assertEqual(self._responsesCounter['TCP Responder'], 0)
426 self.assertEqual(self._responsesCounter['TCP Responder 2'], numberOfQueries)
427
428 class TestFirstAvailableQPSPacketCacheHits(DNSDistTest):
429
430 _verboseMode = True
431 _testServer2Port = pickAvailablePort()
432 _config_params = ['_testServerPort', '_testServer2Port']
433 _config_template = """
434 setServerPolicy(firstAvailable)
435 s1 = newServer{address="127.0.0.1:%s", order=2}
436 s1:setUp()
437 s2 = newServer{address="127.0.0.1:%s", order=1, qps=10}
438 s2:setUp()
439 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
440 getPool(""):setCache(pc)
441 """
442
443 @classmethod
444 def startResponders(cls):
445 print("Launching responders..")
446 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
447 cls._UDPResponder.daemon = True
448 cls._UDPResponder.start()
449 cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
450 cls._UDPResponder2.daemon = True
451 cls._UDPResponder2.start()
452
453 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
454 cls._TCPResponder.daemon = True
455 cls._TCPResponder.start()
456
457 cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
458 cls._TCPResponder2.daemon = True
459 cls._TCPResponder2.start()
460
461 def testOrderQPSCacheHits(self):
462 """
463 Routing: firstAvailable policy with QPS limit and packet cache
464
465 Send 50 A queries for "order-qps-cache.routing.tests.powerdns.com.",
466 then 10 A queries for "order-qps-cache-2.routing.tests.powerdns.com." (uncached)
467 check that dnsdist routes all of the (uncached) queries to the second backend, because it has the lower order value,
468 and the QPS should only be counted for cache misses.
469 """
470 numberOfQueries = 50
471 name = 'order-qps-cache.routing.tests.powerdns.com.'
472 query = dns.message.make_query(name, 'A', 'IN')
473 response = dns.message.make_response(query)
474 rrset = dns.rrset.from_text(name,
475 60,
476 dns.rdataclass.IN,
477 dns.rdatatype.A,
478 '192.0.2.1')
479 response.answer.append(rrset)
480
481 # first queries to fill the cache
482 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
483 self.assertTrue(receivedQuery)
484 self.assertTrue(receivedResponse)
485 receivedQuery.id = query.id
486 self.assertEqual(query, receivedQuery)
487 self.assertEqual(receivedResponse, response)
488 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
489 self.assertTrue(receivedQuery)
490 self.assertTrue(receivedResponse)
491 receivedQuery.id = query.id
492 self.assertEqual(query, receivedQuery)
493 self.assertEqual(receivedResponse, response)
494
495 for _ in range(numberOfQueries):
496 for method in ("sendUDPQuery", "sendTCPQuery"):
497 sender = getattr(self, method)
498 (_, receivedResponse) = sender(query, response=None, useQueue=False)
499 self.assertEqual(receivedResponse, response)
500
501 numberOfQueries = 10
502 name = 'order-qps-cache-2.routing.tests.powerdns.com.'
503 query = dns.message.make_query(name, 'A', 'IN')
504 response = dns.message.make_response(query)
505 rrset = dns.rrset.from_text(name,
506 60,
507 dns.rdataclass.IN,
508 dns.rdatatype.A,
509 '192.0.2.1')
510 response.answer.append(rrset)
511
512 # first queries to fill the cache
513 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
514 self.assertTrue(receivedQuery)
515 self.assertTrue(receivedResponse)
516 receivedQuery.id = query.id
517 self.assertEqual(query, receivedQuery)
518 self.assertEqual(receivedResponse, response)
519 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
520 self.assertTrue(receivedQuery)
521 self.assertTrue(receivedResponse)
522 receivedQuery.id = query.id
523 self.assertEqual(query, receivedQuery)
524 self.assertEqual(receivedResponse, response)
525
526 for _ in range(numberOfQueries):
527 for method in ("sendUDPQuery", "sendTCPQuery"):
528 sender = getattr(self, method)
529 (_, receivedResponse) = sender(query, response=None, useQueue=False)
530 self.assertEqual(receivedResponse, response)
531
532 # 4 queries should made it through, 2 UDP and 2 TCP
533 for k,v in self._responsesCounter.items():
534 print(k)
535 print(v)
536
537 if 'UDP Responder' in self._responsesCounter:
538 self.assertEqual(self._responsesCounter['UDP Responder'], 0)
539 self.assertEqual(self._responsesCounter['UDP Responder 2'], 2)
540 if 'TCP Responder' in self._responsesCounter:
541 self.assertEqual(self._responsesCounter['TCP Responder'], 0)
542 self.assertEqual(self._responsesCounter['TCP Responder 2'], 2)
543
544 class TestRoutingNoServer(DNSDistTest):
545
546 _config_template = """
547 newServer{address="127.0.0.1:%s", pool="real"}
548 setServFailWhenNoServer(true)
549 """
550
551 def testPolicyPoolNoServer(self):
552 """
553 Routing: No server should return ServFail
554 """
555 # without EDNS
556 name = 'noserver.routing.tests.powerdns.com.'
557 query = dns.message.make_query(name, 'A', 'IN')
558 expectedResponse = dns.message.make_response(query)
559 expectedResponse.set_rcode(dns.rcode.SERVFAIL)
560
561 for method in ("sendUDPQuery", "sendTCPQuery"):
562 sender = getattr(self, method)
563 (_, receivedResponse) = sender(query, response=None, useQueue=False)
564 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
565
566 # now with EDNS
567 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
568 expectedResponse = dns.message.make_response(query, our_payload=1232)
569 expectedResponse.set_rcode(dns.rcode.SERVFAIL)
570
571 for method in ("sendUDPQuery", "sendTCPQuery"):
572 sender = getattr(self, method)
573 (_, receivedResponse) = sender(query, response=None, useQueue=False)
574 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
575 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
576 self.assertEqual(receivedResponse.payload, 1232)
577
578 class TestRoutingWRandom(DNSDistTest):
579
580 _testServer2Port = pickAvailablePort()
581 _config_params = ['_testServerPort', '_testServer2Port']
582 _config_template = """
583 setServerPolicy(wrandom)
584 s1 = newServer{address="127.0.0.1:%s", weight=1}
585 s1:setUp()
586 s2 = newServer{address="127.0.0.1:%s", weight=2}
587 s2:setUp()
588 """
589
590 @classmethod
591 def startResponders(cls):
592 print("Launching responders..")
593 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
594 cls._UDPResponder.daemon = True
595 cls._UDPResponder.start()
596 cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
597 cls._UDPResponder2.daemon = True
598 cls._UDPResponder2.start()
599
600 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
601 cls._TCPResponder.daemon = True
602 cls._TCPResponder.start()
603
604 cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
605 cls._TCPResponder2.daemon = True
606 cls._TCPResponder2.start()
607
608 def testWRandom(self):
609 """
610 Routing: WRandom
611
612 Send 100 A queries to "wrandom.routing.tests.powerdns.com.",
613 check that dnsdist routes less than half to one, more to the other.
614 """
615 numberOfQueries = 100
616 name = 'wrandom.routing.tests.powerdns.com.'
617 query = dns.message.make_query(name, 'A', 'IN')
618 response = dns.message.make_response(query)
619 rrset = dns.rrset.from_text(name,
620 60,
621 dns.rdataclass.IN,
622 dns.rdatatype.A,
623 '192.0.2.1')
624 response.answer.append(rrset)
625
626 # the counter is shared for UDP and TCP,
627 # so we need to do UDP then TCP to have a clean count
628 for _ in range(numberOfQueries):
629 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
630 receivedQuery.id = query.id
631 self.assertEqual(query, receivedQuery)
632 self.assertEqual(response, receivedResponse)
633
634 for _ in range(numberOfQueries):
635 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
636 receivedQuery.id = query.id
637 self.assertEqual(query, receivedQuery)
638 self.assertEqual(response, receivedResponse)
639
640 # The lower weight downstream should receive less than half the queries
641 self.assertLess(self._responsesCounter['UDP Responder'], numberOfQueries * 0.50)
642 self.assertLess(self._responsesCounter['TCP Responder'], numberOfQueries * 0.50)
643
644 # The higher weight downstream should receive more than half the queries
645 self.assertGreater(self._responsesCounter['UDP Responder 2'], numberOfQueries * 0.50)
646 self.assertGreater(self._responsesCounter['TCP Responder 2'], numberOfQueries * 0.50)
647
648
649 class TestRoutingHighValueWRandom(DNSDistTest):
650
651 _testServer2Port = pickAvailablePort()
652 _consoleKey = DNSDistTest.generateConsoleKey()
653 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
654 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_testServer2Port']
655 _config_template = """
656 setKey("%s")
657 controlSocket("127.0.0.1:%s")
658 setServerPolicy(wrandom)
659 s1 = newServer{address="127.0.0.1:%s", weight=2000000000}
660 s1:setUp()
661 s2 = newServer{address="127.0.0.1:%s", weight=2000000000}
662 s2:setUp()
663 """
664
665 @classmethod
666 def startResponders(cls):
667 print("Launching responders..")
668 cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
669 cls._UDPResponder.daemon = True
670 cls._UDPResponder.start()
671 cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
672 cls._UDPResponder2.daemon = True
673 cls._UDPResponder2.start()
674
675 cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
676 cls._TCPResponder.daemon = True
677 cls._TCPResponder.start()
678
679 cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
680 cls._TCPResponder2.daemon = True
681 cls._TCPResponder2.start()
682
683 def testHighValueWRandom(self):
684 """
685 Routing: WRandom (overflow)
686
687 Send 100 A queries to "wrandom-overflow.routing.tests.powerdns.com.",
688 check that dnsdist routes to each downstream, rather than failing with
689 no-policy.
690 """
691 numberOfQueries = 100
692 name = 'wrandom-overflow.routing.tests.powerdns.com.'
693 query = dns.message.make_query(name, 'A', 'IN')
694 response = dns.message.make_response(query)
695 rrset = dns.rrset.from_text(name,
696 60,
697 dns.rdataclass.IN,
698 dns.rdatatype.A,
699 '192.0.2.1')
700 response.answer.append(rrset)
701
702 # the counter is shared for UDP and TCP,
703 # so we need to do UDP then TCP to have a clean count
704 for _ in range(numberOfQueries):
705 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
706 receivedQuery.id = query.id
707 self.assertEqual(query, receivedQuery)
708 self.assertEqual(response, receivedResponse)
709
710 for _ in range(numberOfQueries):
711 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
712 receivedQuery.id = query.id
713 self.assertEqual(query, receivedQuery)
714 self.assertEqual(response, receivedResponse)
715
716 stats = self.sendConsoleCommand("dumpStats()").split()
717 stats_dict = {}
718
719 # Map to a dict with every other element being the value to the previous one
720 for i, x in enumerate(stats):
721 if not i % 2:
722 stats_dict[x] = stats[i+1]
723
724 # There should be no queries getting "no-policy" responses
725 self.assertEqual(stats_dict['no-policy'], '0')
726
727 # Each downstream should receive some queries, but it will be unbalanced
728 # because the sum of the weights is higher than INT_MAX.
729 # The first downstream will receive more than half the queries
730 self.assertGreater(self._responsesCounter['UDP Responder'], numberOfQueries / 2)
731 self.assertGreater(self._responsesCounter['TCP Responder'], numberOfQueries / 2)
732
733 # The second downstream will receive the remainder of the queries, but it might very well be 0
734 if 'UDP Responder 2' in self._responsesCounter:
735 self.assertEqual(self._responsesCounter['UDP Responder 2'], numberOfQueries - self._responsesCounter['UDP Responder'])
736 if 'TCP Responder 2' in self._responsesCounter:
737 self.assertEqual(self._responsesCounter['TCP Responder 2'], numberOfQueries - self._responsesCounter['TCP Responder'])