]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Async.py
Merge pull request #12331 from v1shnya/master
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Async.py
1 #!/usr/bin/env python
2
3 import os
4 import socket
5 import threading
6 import unittest
7 import dns
8 from dnsdisttests import DNSDistTest
9
10 def AsyncResponder(listenPath, responsePath):
11 # Make sure the socket does not already exist
12 try:
13 os.unlink(listenPath)
14 except OSError:
15 if os.path.exists(listenPath):
16 raise
17
18 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
19 try:
20 sock.bind(listenPath)
21 except socket.error as e:
22 print("Error binding in the Asynchronous responder: %s" % str(e))
23 sys.exit(1)
24
25 while True:
26 data, addr = sock.recvfrom(65535)
27 print("Got message [%d] '%s' from %s" % (len(data), data, addr))
28 if not data:
29 break
30
31 request = dns.message.from_wire(data)
32 reply = str(request.id) + ' '
33 if str(request.question[0].name).startswith('accept-then-refuse'):
34 if request.flags & dns.flags.QR:
35 reply = reply + 'refuse'
36 else:
37 reply = reply + 'accept'
38 elif str(request.question[0].name).startswith('accept-then-drop'):
39 if request.flags & dns.flags.QR:
40 reply = reply + 'drop'
41 else:
42 reply = reply + 'accept'
43 elif str(request.question[0].name).startswith('accept-then-custom'):
44 if request.flags & dns.flags.QR:
45 reply = reply + 'custom'
46 else:
47 reply = reply + 'accept'
48 elif str(request.question[0].name).startswith('timeout-then-accept'):
49 if request.flags & dns.flags.QR:
50 reply = reply + 'accept'
51 else:
52 # no response
53 continue
54 elif str(request.question[0].name).startswith('accept-then-timeout'):
55 if request.flags & dns.flags.QR:
56 # no response
57 continue
58 else:
59 reply = reply + 'accept'
60 elif str(request.question[0].name).startswith('accept'):
61 reply = reply + 'accept'
62 elif str(request.question[0].name).startswith('refuse'):
63 reply = reply + 'refuse'
64 elif str(request.question[0].name).startswith('drop'):
65 reply = reply + 'drop'
66 elif str(request.question[0].name).startswith('custom'):
67 reply = reply + 'custom'
68 elif str(request.question[0].name).startswith('timeout'):
69 # no response
70 continue
71 else:
72 reply = reply + 'invalid'
73
74 remote = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
75 remote.connect(responsePath)
76 remote.send(reply.encode())
77 print("Sent [%d] '%s' to %s" % (len(reply), reply, responsePath))
78
79 sock.close()
80
81 asyncResponderSocketPath = '/tmp/async-responder.sock'
82 dnsdistSocketPath = '/tmp/dnsdist.sock'
83 asyncResponder = threading.Thread(name='Asynchronous Responder', target=AsyncResponder, args=[asyncResponderSocketPath, dnsdistSocketPath])
84 asyncResponder.setDaemon(True)
85 asyncResponder.start()
86
87 class AsyncTests(object):
88 def testPass(self):
89 """
90 Async: Accept
91 """
92 for name in ['accept.async.tests.powerdns.com.', 'accept.tcp-only.async.tests.powerdns.com.']:
93 query = dns.message.make_query(name, 'A', 'IN')
94
95 response = dns.message.make_response(query)
96 rrset = dns.rrset.from_text(name,
97 60,
98 dns.rdataclass.IN,
99 dns.rdatatype.A,
100 '192.0.2.1')
101 response.answer.append(rrset)
102
103 for method in ("sendUDPQuery", "sendTCPQuery"):
104 sender = getattr(self, method)
105 (receivedQuery, receivedResponse) = sender(query, response)
106 receivedQuery.id = query.id
107 self.assertEqual(query, receivedQuery)
108 self.assertEqual(response, receivedResponse)
109
110 (receivedQuery, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=response, caFile=self._caCert)
111 receivedQuery.id = query.id
112 self.assertEqual(query, receivedQuery)
113 self.assertEqual(response, receivedResponse)
114
115 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
116 receivedQuery.id = query.id
117 self.assertEqual(query, receivedQuery)
118 self.assertEqual(response, receivedResponse)
119
120 def testPassCached(self):
121 """
122 Async: Accept (cached)
123 """
124 name = 'accept.cache.async.tests.powerdns.com.'
125 query = dns.message.make_query(name, 'A', 'IN')
126
127 response = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 60,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '192.0.2.1')
133 response.answer.append(rrset)
134
135 for method in ("sendUDPQuery", "sendTCPQuery"):
136 # first time to fill the cache
137 sender = getattr(self, method)
138 (receivedQuery, receivedResponse) = sender(query, response)
139 receivedQuery.id = query.id
140 self.assertEqual(query, receivedQuery)
141 self.assertEqual(response, receivedResponse)
142 # second time from the cache
143 sender = getattr(self, method)
144 (_, receivedResponse) = sender(query, response=None, useQueue=False)
145 self.assertEqual(response, receivedResponse)
146
147 (_, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=None, useQueue=False, caFile=self._caCert)
148 self.assertEqual(response, receivedResponse)
149
150 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
151 receivedQuery.id = query.id
152 self.assertEqual(query, receivedQuery)
153 self.assertEqual(response, receivedResponse)
154
155 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=None, useQueue=False, caFile=self._caCert)
156 self.assertEqual(response, receivedResponse)
157
158 def testTimeoutThenAccept(self):
159 """
160 Async: Timeout then accept
161 """
162 for name in ['timeout-then-accept.async.tests.powerdns.com.', 'timeout-then-accept.tcp-only.async.tests.powerdns.com.']:
163 query = dns.message.make_query(name, 'A', 'IN')
164
165 response = dns.message.make_response(query)
166 rrset = dns.rrset.from_text(name,
167 60,
168 dns.rdataclass.IN,
169 dns.rdatatype.A,
170 '192.0.2.1')
171 response.answer.append(rrset)
172
173 for method in ("sendUDPQuery", "sendTCPQuery"):
174 sender = getattr(self, method)
175 (receivedQuery, receivedResponse) = sender(query, response)
176 receivedQuery.id = query.id
177 self.assertEqual(query, receivedQuery)
178 self.assertEqual(response, receivedResponse)
179
180 (receivedQuery, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=response, caFile=self._caCert)
181 receivedQuery.id = query.id
182 self.assertEqual(query, receivedQuery)
183 self.assertEqual(response, receivedResponse)
184
185 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
186 receivedQuery.id = query.id
187 self.assertEqual(query, receivedQuery)
188 self.assertEqual(response, receivedResponse)
189
190 def testAcceptThenTimeout(self):
191 """
192 Async: Accept then timeout
193 """
194 for name in ['accept-then-timeout.async.tests.powerdns.com.', 'accept-then-timeout.tcp-only.async.tests.powerdns.com.']:
195 query = dns.message.make_query(name, 'A', 'IN')
196
197 response = dns.message.make_response(query)
198 rrset = dns.rrset.from_text(name,
199 60,
200 dns.rdataclass.IN,
201 dns.rdatatype.A,
202 '192.0.2.1')
203 response.answer.append(rrset)
204
205 for method in ("sendUDPQuery", "sendTCPQuery"):
206 sender = getattr(self, method)
207 (receivedQuery, receivedResponse) = sender(query, response)
208 receivedQuery.id = query.id
209 self.assertEqual(query, receivedQuery)
210 self.assertEqual(response, receivedResponse)
211
212 (receivedQuery, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=response, caFile=self._caCert)
213 receivedQuery.id = query.id
214 self.assertEqual(query, receivedQuery)
215 self.assertEqual(response, receivedResponse)
216
217 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
218 receivedQuery.id = query.id
219 self.assertEqual(query, receivedQuery)
220 self.assertEqual(response, receivedResponse)
221
222 def testAcceptThenRefuse(self):
223 """
224 Async: Accept then refuse
225 """
226 for name in ['accept-then-refuse.async.tests.powerdns.com.', 'accept-then-refuse.tcp-only.async.tests.powerdns.com.']:
227 query = dns.message.make_query(name, 'A', 'IN')
228
229 response = dns.message.make_response(query)
230 rrset = dns.rrset.from_text(name,
231 60,
232 dns.rdataclass.IN,
233 dns.rdatatype.A,
234 '192.0.2.1')
235 response.answer.append(rrset)
236
237 expectedResponse = dns.message.make_response(query)
238 expectedResponse.flags |= dns.flags.RA
239 expectedResponse.set_rcode(dns.rcode.REFUSED)
240
241 for method in ("sendUDPQuery", "sendTCPQuery"):
242 sender = getattr(self, method)
243 (receivedQuery, receivedResponse) = sender(query, response)
244 receivedQuery.id = query.id
245 self.assertEqual(query, receivedQuery)
246 self.assertEqual(expectedResponse, receivedResponse)
247
248 (receivedQuery, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=response, caFile=self._caCert)
249 receivedQuery.id = query.id
250 self.assertEqual(query, receivedQuery)
251 self.assertEqual(expectedResponse, receivedResponse)
252
253 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
254 receivedQuery.id = query.id
255 self.assertEqual(query, receivedQuery)
256 self.assertEqual(expectedResponse, receivedResponse)
257
258 def testAcceptThenCustom(self):
259 """
260 Async: Accept then custom
261 """
262 for name in ['accept-then-custom.async.tests.powerdns.com.', 'accept-then-custom.tcp-only.async.tests.powerdns.com.']:
263 query = dns.message.make_query(name, 'A', 'IN')
264
265 response = dns.message.make_response(query)
266 rrset = dns.rrset.from_text(name,
267 60,
268 dns.rdataclass.IN,
269 dns.rdatatype.A,
270 '192.0.2.1')
271 response.answer.append(rrset)
272
273 # easier to get the same custom response to everyone, sorry!
274 expectedQuery = dns.message.make_query('custom.async.tests.powerdns.com.', 'A', 'IN')
275 expectedQuery.id = query.id
276 expectedResponse = dns.message.make_response(expectedQuery)
277 expectedResponse.flags |= dns.flags.RA
278 expectedResponse.set_rcode(dns.rcode.FORMERR)
279
280 for method in ("sendUDPQuery", "sendTCPQuery"):
281 sender = getattr(self, method)
282 (receivedQuery, receivedResponse) = sender(query, response)
283 receivedQuery.id = query.id
284 self.assertEqual(query, receivedQuery)
285 self.assertEqual(expectedResponse, receivedResponse)
286
287 (receivedQuery, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=response, caFile=self._caCert)
288 receivedQuery.id = query.id
289 self.assertEqual(query, receivedQuery)
290 self.assertEqual(expectedResponse, receivedResponse)
291
292 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
293 receivedQuery.id = query.id
294 self.assertEqual(query, receivedQuery)
295 self.assertEqual(expectedResponse, receivedResponse)
296
297 def testAcceptThenDrop(self):
298 """
299 Async: Accept then drop
300 """
301 for name in ['accept-then-drop.async.tests.powerdns.com.', 'accept-then-drop.tcp-only.async.tests.powerdns.com.']:
302 query = dns.message.make_query(name, 'A', 'IN')
303
304 response = dns.message.make_response(query)
305 rrset = dns.rrset.from_text(name,
306 60,
307 dns.rdataclass.IN,
308 dns.rdatatype.A,
309 '192.0.2.1')
310 response.answer.append(rrset)
311
312 for method in ("sendUDPQuery", "sendTCPQuery"):
313 sender = getattr(self, method)
314 (receivedQuery, receivedResponse) = sender(query, response)
315 receivedQuery.id = query.id
316 self.assertEqual(query, receivedQuery)
317 self.assertEqual(receivedResponse, None)
318
319 (receivedQuery, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=response, caFile=self._caCert)
320 receivedQuery.id = query.id
321 self.assertEqual(query, receivedQuery)
322 self.assertEqual(receivedResponse, None)
323
324 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
325 receivedQuery.id = query.id
326 self.assertEqual(query, receivedQuery)
327 self.assertEqual(receivedResponse, None)
328
329 def testRefused(self):
330 """
331 Async: Refused
332 """
333 name = 'refused.async.tests.powerdns.com.'
334 query = dns.message.make_query(name, 'A', 'IN')
335
336 expectedResponse = dns.message.make_response(query)
337 expectedResponse.flags |= dns.flags.RA
338 expectedResponse.set_rcode(dns.rcode.REFUSED)
339
340 for method in ("sendUDPQuery", "sendTCPQuery"):
341 sender = getattr(self, method)
342 (_, receivedResponse) = sender(query, response=None, useQueue=False)
343 self.assertTrue(receivedResponse)
344 self.assertEqual(expectedResponse, receivedResponse)
345
346 (_, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
347 self.assertEqual(expectedResponse, receivedResponse)
348
349 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=None, caFile=self._caCert, useQueue=False)
350 self.assertEqual(expectedResponse, receivedResponse)
351
352 def testDrop(self):
353 """
354 Async: Drop
355 """
356 name = 'drop.async.tests.powerdns.com.'
357 query = dns.message.make_query(name, 'A', 'IN')
358
359 for method in ("sendUDPQuery", "sendTCPQuery"):
360 sender = getattr(self, method)
361 (_, receivedResponse) = sender(query, response=None, useQueue=False)
362 self.assertEqual(receivedResponse, None)
363
364 (_, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
365 self.assertEqual(receivedResponse, None)
366
367 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=None, caFile=self._caCert, useQueue=False)
368 self.assertEqual(receivedResponse, None)
369
370 def testCustom(self):
371 """
372 Async: Custom answer
373 """
374 name = 'custom.async.tests.powerdns.com.'
375 query = dns.message.make_query(name, 'A', 'IN')
376
377 expectedResponse = dns.message.make_response(query)
378 expectedResponse.flags |= dns.flags.RA
379 expectedResponse.set_rcode(dns.rcode.FORMERR)
380
381 for method in ("sendUDPQuery", "sendTCPQuery"):
382 sender = getattr(self, method)
383 (_, receivedResponse) = sender(query, response=None, useQueue=False)
384 self.assertTrue(receivedResponse)
385 self.assertEqual(expectedResponse, receivedResponse)
386
387 (_, receivedResponse) = self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response=None, caFile=self._caCert, useQueue=False)
388 self.assertEqual(expectedResponse, receivedResponse)
389
390 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=None, caFile=self._caCert, useQueue=False)
391 self.assertEqual(expectedResponse, receivedResponse)
392
393 def testTruncation(self):
394 """
395 Async: DoH query, timeout then truncated answer over UDP, then valid over TCP and accept
396 """
397 # the query is first forwarded over UDP, leading to a TC=1 answer from the
398 # backend, then over TCP
399 name = 'timeout-then-accept.tc.async.tests.powerdns.com.'
400 query = dns.message.make_query(name, 'A', 'IN')
401 query.id = 42
402 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
403 expectedQuery.id = 42
404 response = dns.message.make_response(query)
405 rrset = dns.rrset.from_text(name,
406 3600,
407 dns.rdataclass.IN,
408 dns.rdatatype.A,
409 '127.0.0.1')
410 response.answer.append(rrset)
411
412 # first response is a TC=1
413 tcResponse = dns.message.make_response(query)
414 tcResponse.flags |= dns.flags.TC
415 self._toResponderQueue.put(tcResponse, True, 2.0)
416
417 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, response=response)
418 # first query, received by the responder over UDP
419 self.assertTrue(receivedQuery)
420 receivedQuery.id = expectedQuery.id
421 self.assertEqual(expectedQuery, receivedQuery)
422 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
423
424 # check the response
425 self.assertTrue(receivedResponse)
426 self.assertEqual(response, receivedResponse)
427
428 # check the second query, received by the responder over TCP
429 receivedQuery = self._fromResponderQueue.get(True, 2.0)
430 self.assertTrue(receivedQuery)
431 receivedQuery.id = expectedQuery.id
432 self.assertEqual(expectedQuery, receivedQuery)
433 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
434
435 @unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
436 class TestAsyncFFI(DNSDistTest, AsyncTests):
437
438 _serverKey = 'server.key'
439 _serverCert = 'server.chain'
440 _serverName = 'tls.tests.dnsdist.org'
441 _caCert = 'ca.pem'
442 _tlsServerPort = 8453
443 _dohServerPort = 8443
444 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
445
446 _config_template = """
447 newServer{address="127.0.0.1:%s", pool={'', 'cache'}}
448 newServer{address="127.0.0.1:%s", pool="tcp-only", tcpOnly=true }
449
450 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
451 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/"})
452
453 local ffi = require("ffi")
454 local C = ffi.C
455
456 local filteringTagName = 'filtering'
457 local filteringTagValue = 'pass'
458 local asyncID = 0
459
460 pc = newPacketCache(100)
461 getPool('cache'):setCache(pc)
462
463 function gotAsyncResponse(endpointID, message, from)
464
465 print('Got async response '..message)
466 local parts = {}
467 for part in message:gmatch("%%S+") do table.insert(parts, part) end
468 if #parts ~= 2 then
469 print('Invalid message')
470 return
471 end
472 local queryID = tonumber(parts[1])
473 if parts[2] == 'accept' then
474 print('accepting')
475 C.dnsdist_ffi_resume_from_async(asyncID, queryID, filteringTagName, #filteringTagName, filteringTagValue, #filteringTagValue, true)
476 return
477 end
478 if parts[2] == 'refuse' then
479 print('refusing')
480 C.dnsdist_ffi_set_rcode_from_async(asyncID, queryID, DNSRCode.REFUSED, true)
481 return
482 end
483 if parts[2] == 'drop' then
484 print('dropping')
485 C.dnsdist_ffi_drop_from_async(asyncID, queryID)
486 return
487 end
488 if parts[2] == 'custom' then
489 print('sending a custom response')
490 local raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\006custom\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
491 C.dnsdist_ffi_set_answer_from_async(asyncID, queryID, raw, #raw)
492 return
493 end
494 end
495
496 local asyncResponderEndpoint = newNetworkEndpoint('%s')
497 local listener = newNetworkListener()
498 listener:addUnixListeningEndpoint('%s', 0, gotAsyncResponse)
499 listener:start()
500
501 function passQueryToAsyncFilter(dq)
502 print('in passQueryToAsyncFilter')
503 local timeout = 500 -- 500 ms
504
505 local queryPtr = C.dnsdist_ffi_dnsquestion_get_header(dq)
506 local querySize = C.dnsdist_ffi_dnsquestion_get_len(dq)
507
508 -- we need to take a copy, as we can no longer touch that data after calling set_async
509 local buffer = ffi.string(queryPtr, querySize)
510
511 print(C.dnsdist_ffi_dnsquestion_set_async(dq, asyncID, C.dnsdist_ffi_dnsquestion_get_id(dq), timeout))
512 asyncResponderEndpoint:send(buffer)
513
514 return DNSAction.Allow
515 end
516
517 function passResponseToAsyncFilter(dr)
518 print('in passResponseToAsyncFilter')
519 local timeout = 500 -- 500 ms
520
521 local responsePtr = C.dnsdist_ffi_dnsquestion_get_header(dr)
522 local responseSize = C.dnsdist_ffi_dnsquestion_get_len(dr)
523
524 -- we need to take a copy, as we can no longer touch that data after calling set_async
525 local buffer = ffi.string(responsePtr, responseSize)
526
527 print(C.dnsdist_ffi_dnsresponse_set_async(dr, asyncID, C.dnsdist_ffi_dnsquestion_get_id(dr), timeout))
528 asyncResponderEndpoint:send(buffer)
529
530 return DNSResponseAction.Allow
531 end
532
533 -- this only matters for tests actually reaching the backend
534 addAction('tcp-only.async.tests.powerdns.com', PoolAction('tcp-only', false))
535 addAction('cache.async.tests.powerdns.com', PoolAction('cache', false))
536 addAction(AllRule(), LuaFFIAction(passQueryToAsyncFilter))
537 addCacheHitResponseAction(AllRule(), LuaFFIResponseAction(passResponseToAsyncFilter))
538 addResponseAction(AllRule(), LuaFFIResponseAction(passResponseToAsyncFilter))
539 """
540 _asyncResponderSocketPath = asyncResponderSocketPath
541 _dnsdistSocketPath = dnsdistSocketPath
542 _config_params = ['_testServerPort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohServerPort', '_serverCert', '_serverKey', '_asyncResponderSocketPath', '_dnsdistSocketPath']
543 _verboseMode = True
544
545 @unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
546 class TestAsyncLua(DNSDistTest, AsyncTests):
547
548 _serverKey = 'server.key'
549 _serverCert = 'server.chain'
550 _serverName = 'tls.tests.dnsdist.org'
551 _caCert = 'ca.pem'
552 _tlsServerPort = 8453
553 _dohServerPort = 8443
554 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
555
556 _config_template = """
557 newServer{address="127.0.0.1:%s", pool={'', 'cache'}}
558 newServer{address="127.0.0.1:%s", pool="tcp-only", tcpOnly=true }
559
560 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
561 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/"})
562
563 local filteringTagName = 'filtering'
564 local filteringTagValue = 'pass'
565 local asyncID = 0
566
567 pc = newPacketCache(100)
568 getPool('cache'):setCache(pc)
569
570 function gotAsyncResponse(endpointID, message, from)
571
572 print('Got async response '..message)
573 local parts = {}
574 for part in message:gmatch("%%S+") do
575 table.insert(parts, part)
576 end
577 if #parts ~= 2 then
578 print('Invalid message')
579 return
580 end
581 local queryID = tonumber(parts[1])
582 local asyncObject = getAsynchronousObject(asyncID, queryID)
583 if parts[2] == 'accept' then
584 print('accepting')
585 local dq = asyncObject:getDQ()
586 dq:setTag(filteringTagName, filteringTagValue)
587 asyncObject:resume()
588 return
589 end
590 if parts[2] == 'refuse' then
591 print('refusing')
592 local dq = asyncObject:getDQ()
593 asyncObject:setRCode(DNSRCode.REFUSED, true)
594 asyncObject:resume()
595 return
596 end
597 if parts[2] == 'drop' then
598 print('dropping')
599 asyncObject:drop()
600 return
601 end
602 if parts[2] == 'custom' then
603 print('sending a custom response')
604 local raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\006custom\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
605 local dq = asyncObject:getDQ()
606 dq:setContent(raw)
607 asyncObject:resume()
608 return
609 end
610 end
611
612 local asyncResponderEndpoint = newNetworkEndpoint('%s')
613 local listener = newNetworkListener()
614 listener:addUnixListeningEndpoint('%s', 0, gotAsyncResponse)
615 listener:start()
616
617 function passQueryToAsyncFilter(dq)
618 print('in passQueryToAsyncFilter')
619 local timeout = 500 -- 500 ms
620
621 local buffer = dq:getContent()
622 local id = dq.dh:getID()
623 dq:suspend(asyncID, id, timeout)
624 asyncResponderEndpoint:send(buffer)
625
626 return DNSAction.Allow
627 end
628
629 function passResponseToAsyncFilter(dr)
630 print('in passResponseToAsyncFilter')
631 local timeout = 500 -- 500 ms
632
633 local buffer = dr:getContent()
634 local id = dr.dh:getID()
635 dr:suspend(asyncID, id, timeout)
636 asyncResponderEndpoint:send(buffer)
637
638 return DNSResponseAction.Allow
639 end
640
641 -- this only matters for tests actually reaching the backend
642 addAction('tcp-only.async.tests.powerdns.com', PoolAction('tcp-only', false))
643 addAction('cache.async.tests.powerdns.com', PoolAction('cache', false))
644 addAction(AllRule(), LuaAction(passQueryToAsyncFilter))
645 addCacheHitResponseAction(AllRule(), LuaResponseAction(passResponseToAsyncFilter))
646 addResponseAction(AllRule(), LuaResponseAction(passResponseToAsyncFilter))
647 """
648 _asyncResponderSocketPath = asyncResponderSocketPath
649 _dnsdistSocketPath = dnsdistSocketPath
650 _config_params = ['_testServerPort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohServerPort', '_serverCert', '_serverKey', '_asyncResponderSocketPath', '_dnsdistSocketPath']
651 _verboseMode = True