]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Async.py
Merge pull request #13756 from rgacogne/ddist-xsk-doc-typos
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Async.py
CommitLineData
fda32c1c
RG
1#!/usr/bin/env python
2
3import os
4import socket
fda6bdcb 5import sys
fda32c1c
RG
6import threading
7import unittest
8import dns
473cc2df 9import dns.message
e7000cce
CHB
10import doqclient
11
630eb526 12from dnsdisttests import DNSDistTest, pickAvailablePort
fda32c1c
RG
13
14def AsyncResponder(listenPath, responsePath):
15 # Make sure the socket does not already exist
16 try:
17 os.unlink(listenPath)
18 except OSError:
19 if os.path.exists(listenPath):
20 raise
21
22 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
23 try:
24 sock.bind(listenPath)
25 except socket.error as e:
26 print("Error binding in the Asynchronous responder: %s" % str(e))
27 sys.exit(1)
28
29 while True:
30 data, addr = sock.recvfrom(65535)
31 print("Got message [%d] '%s' from %s" % (len(data), data, addr))
32 if not data:
33 break
34
35 request = dns.message.from_wire(data)
36 reply = str(request.id) + ' '
37 if str(request.question[0].name).startswith('accept-then-refuse'):
38 if request.flags & dns.flags.QR:
39 reply = reply + 'refuse'
40 else:
41 reply = reply + 'accept'
42 elif str(request.question[0].name).startswith('accept-then-drop'):
43 if request.flags & dns.flags.QR:
44 reply = reply + 'drop'
45 else:
46 reply = reply + 'accept'
47 elif str(request.question[0].name).startswith('accept-then-custom'):
48 if request.flags & dns.flags.QR:
49 reply = reply + 'custom'
50 else:
51 reply = reply + 'accept'
52 elif str(request.question[0].name).startswith('timeout-then-accept'):
53 if request.flags & dns.flags.QR:
54 reply = reply + 'accept'
55 else:
56 # no response
57 continue
58 elif str(request.question[0].name).startswith('accept-then-timeout'):
59 if request.flags & dns.flags.QR:
60 # no response
61 continue
62 else:
63 reply = reply + 'accept'
64 elif str(request.question[0].name).startswith('accept'):
65 reply = reply + 'accept'
66 elif str(request.question[0].name).startswith('refuse'):
67 reply = reply + 'refuse'
68 elif str(request.question[0].name).startswith('drop'):
69 reply = reply + 'drop'
70 elif str(request.question[0].name).startswith('custom'):
71 reply = reply + 'custom'
72 elif str(request.question[0].name).startswith('timeout'):
73 # no response
74 continue
75 else:
76 reply = reply + 'invalid'
77
78 remote = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
79 remote.connect(responsePath)
80 remote.send(reply.encode())
81 print("Sent [%d] '%s' to %s" % (len(reply), reply, responsePath))
82
83 sock.close()
84
85asyncResponderSocketPath = '/tmp/async-responder.sock'
86dnsdistSocketPath = '/tmp/dnsdist.sock'
87asyncResponder = threading.Thread(name='Asynchronous Responder', target=AsyncResponder, args=[asyncResponderSocketPath, dnsdistSocketPath])
630eb526 88asyncResponder.daemon = True
fda32c1c
RG
89asyncResponder.start()
90
45b91456 91class AsyncTests(object):
1cc232f9
RG
92 _serverKey = 'server.key'
93 _serverCert = 'server.chain'
94 _serverName = 'tls.tests.dnsdist.org'
95 _caCert = 'ca.pem'
96 _tlsServerPort = pickAvailablePort()
97 _dohWithNGHTTP2ServerPort = pickAvailablePort()
98 _dohWithH2OServerPort = pickAvailablePort()
99 _dohWithNGHTTP2BaseURL = ("https://%s:%d/" % (_serverName, _dohWithNGHTTP2ServerPort))
100 _dohWithH2OBaseURL = ("https://%s:%d/" % (_serverName, _dohWithH2OServerPort))
0a6676a4 101 _doqServerPort = pickAvailablePort()
1cc232f9 102
fda32c1c
RG
103 def testPass(self):
104 """
105 Async: Accept
106 """
107 for name in ['accept.async.tests.powerdns.com.', 'accept.tcp-only.async.tests.powerdns.com.']:
108 query = dns.message.make_query(name, 'A', 'IN')
109
110 response = dns.message.make_response(query)
111 rrset = dns.rrset.from_text(name,
112 60,
113 dns.rdataclass.IN,
114 dns.rdatatype.A,
115 '192.0.2.1')
116 response.answer.append(rrset)
117
0a6676a4 118 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
119 sender = getattr(self, method)
120 (receivedQuery, receivedResponse) = sender(query, response)
121 receivedQuery.id = query.id
122 self.assertEqual(query, receivedQuery)
0a6676a4
RG
123 if method == 'sendDOQQueryWrapper':
124 # dnspython sets the ID to 0
125 receivedResponse.id = response.id
fda32c1c
RG
126 self.assertEqual(response, receivedResponse)
127
fda32c1c
RG
128 def testPassCached(self):
129 """
130 Async: Accept (cached)
131 """
132 name = 'accept.cache.async.tests.powerdns.com.'
133 query = dns.message.make_query(name, 'A', 'IN')
134
135 response = dns.message.make_response(query)
136 rrset = dns.rrset.from_text(name,
137 60,
138 dns.rdataclass.IN,
139 dns.rdatatype.A,
140 '192.0.2.1')
141 response.answer.append(rrset)
142
0a6676a4 143 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c 144 sender = getattr(self, method)
0a6676a4 145 if method != 'sendDOTQueryWrapper' and method != 'sendDOHWithH2OQueryWrapper' and method != 'sendDOQQueryWrapper':
1cc232f9
RG
146 # first time to fill the cache
147 # disabled for DoT since it was already filled via TCP
148 (receivedQuery, receivedResponse) = sender(query, response)
149 receivedQuery.id = query.id
150 self.assertEqual(query, receivedQuery)
151 self.assertEqual(response, receivedResponse)
152
fda32c1c
RG
153 # second time from the cache
154 sender = getattr(self, method)
155 (_, receivedResponse) = sender(query, response=None, useQueue=False)
0a6676a4
RG
156 if method == 'sendDOQQueryWrapper':
157 # dnspython sets the ID to 0
158 receivedResponse.id = response.id
fda32c1c
RG
159 self.assertEqual(response, receivedResponse)
160
fda32c1c
RG
161 def testTimeoutThenAccept(self):
162 """
163 Async: Timeout then accept
164 """
165 for name in ['timeout-then-accept.async.tests.powerdns.com.', 'timeout-then-accept.tcp-only.async.tests.powerdns.com.']:
166 query = dns.message.make_query(name, 'A', 'IN')
167
168 response = dns.message.make_response(query)
169 rrset = dns.rrset.from_text(name,
170 60,
171 dns.rdataclass.IN,
172 dns.rdatatype.A,
173 '192.0.2.1')
174 response.answer.append(rrset)
175
0a6676a4 176 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
177 sender = getattr(self, method)
178 (receivedQuery, receivedResponse) = sender(query, response)
179 receivedQuery.id = query.id
180 self.assertEqual(query, receivedQuery)
0a6676a4
RG
181 if method == 'sendDOQQueryWrapper':
182 # dnspython sets the ID to 0
183 receivedResponse.id = response.id
fda32c1c
RG
184 self.assertEqual(response, receivedResponse)
185
fda32c1c
RG
186 def testAcceptThenTimeout(self):
187 """
188 Async: Accept then timeout
189 """
190 for name in ['accept-then-timeout.async.tests.powerdns.com.', 'accept-then-timeout.tcp-only.async.tests.powerdns.com.']:
191 query = dns.message.make_query(name, 'A', 'IN')
192
193 response = dns.message.make_response(query)
194 rrset = dns.rrset.from_text(name,
195 60,
196 dns.rdataclass.IN,
197 dns.rdatatype.A,
198 '192.0.2.1')
199 response.answer.append(rrset)
200
0a6676a4 201 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
202 sender = getattr(self, method)
203 (receivedQuery, receivedResponse) = sender(query, response)
204 receivedQuery.id = query.id
205 self.assertEqual(query, receivedQuery)
0a6676a4
RG
206 if method == 'sendDOQQueryWrapper':
207 # dnspython sets the ID to 0
208 receivedResponse.id = response.id
fda32c1c
RG
209 self.assertEqual(response, receivedResponse)
210
fda32c1c
RG
211 def testAcceptThenRefuse(self):
212 """
213 Async: Accept then refuse
214 """
215 for name in ['accept-then-refuse.async.tests.powerdns.com.', 'accept-then-refuse.tcp-only.async.tests.powerdns.com.']:
216 query = dns.message.make_query(name, 'A', 'IN')
217
218 response = dns.message.make_response(query)
219 rrset = dns.rrset.from_text(name,
220 60,
221 dns.rdataclass.IN,
222 dns.rdatatype.A,
223 '192.0.2.1')
224 response.answer.append(rrset)
225
226 expectedResponse = dns.message.make_response(query)
227 expectedResponse.flags |= dns.flags.RA
228 expectedResponse.set_rcode(dns.rcode.REFUSED)
229
0a6676a4 230 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
231 sender = getattr(self, method)
232 (receivedQuery, receivedResponse) = sender(query, response)
233 receivedQuery.id = query.id
234 self.assertEqual(query, receivedQuery)
0a6676a4
RG
235 if method == 'sendDOQQueryWrapper':
236 # dnspython sets the ID to 0
237 receivedResponse.id = expectedResponse.id
fda32c1c
RG
238 self.assertEqual(expectedResponse, receivedResponse)
239
fda32c1c
RG
240 def testAcceptThenCustom(self):
241 """
242 Async: Accept then custom
243 """
244 for name in ['accept-then-custom.async.tests.powerdns.com.', 'accept-then-custom.tcp-only.async.tests.powerdns.com.']:
245 query = dns.message.make_query(name, 'A', 'IN')
246
247 response = dns.message.make_response(query)
248 rrset = dns.rrset.from_text(name,
249 60,
250 dns.rdataclass.IN,
251 dns.rdatatype.A,
252 '192.0.2.1')
253 response.answer.append(rrset)
254
0a6676a4 255 expectedQuery = dns.message.make_query(name, 'A', 'IN')
fda32c1c
RG
256 expectedQuery.id = query.id
257 expectedResponse = dns.message.make_response(expectedQuery)
258 expectedResponse.flags |= dns.flags.RA
259 expectedResponse.set_rcode(dns.rcode.FORMERR)
260
0a6676a4 261 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
262 sender = getattr(self, method)
263 (receivedQuery, receivedResponse) = sender(query, response)
264 receivedQuery.id = query.id
265 self.assertEqual(query, receivedQuery)
0a6676a4
RG
266 if method == 'sendDOQQueryWrapper':
267 # dnspython sets the ID to 0
268 receivedResponse.id = expectedResponse.id
fda32c1c
RG
269 self.assertEqual(expectedResponse, receivedResponse)
270
fda32c1c
RG
271 def testAcceptThenDrop(self):
272 """
273 Async: Accept then drop
274 """
275 for name in ['accept-then-drop.async.tests.powerdns.com.', 'accept-then-drop.tcp-only.async.tests.powerdns.com.']:
276 query = dns.message.make_query(name, 'A', 'IN')
277
278 response = dns.message.make_response(query)
279 rrset = dns.rrset.from_text(name,
280 60,
281 dns.rdataclass.IN,
282 dns.rdatatype.A,
283 '192.0.2.1')
284 response.answer.append(rrset)
285
0a6676a4 286 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c 287 sender = getattr(self, method)
0a6676a4
RG
288 try:
289 (receivedQuery, receivedResponse) = sender(query, response)
e7000cce 290 except doqclient.StreamResetError:
0a6676a4
RG
291 if not self._fromResponderQueue.empty():
292 receivedQuery = self._fromResponderQueue.get(True, 1.0)
293 receivedResponse = None
fda32c1c
RG
294 receivedQuery.id = query.id
295 self.assertEqual(query, receivedQuery)
296 self.assertEqual(receivedResponse, None)
297
fda32c1c
RG
298 def testRefused(self):
299 """
300 Async: Refused
301 """
302 name = 'refused.async.tests.powerdns.com.'
303 query = dns.message.make_query(name, 'A', 'IN')
304
305 expectedResponse = dns.message.make_response(query)
306 expectedResponse.flags |= dns.flags.RA
307 expectedResponse.set_rcode(dns.rcode.REFUSED)
308
0a6676a4 309 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
310 sender = getattr(self, method)
311 (_, receivedResponse) = sender(query, response=None, useQueue=False)
312 self.assertTrue(receivedResponse)
0a6676a4
RG
313 if method == 'sendDOQQueryWrapper':
314 # dnspython sets the ID to 0
315 receivedResponse.id = expectedResponse.id
fda32c1c
RG
316 self.assertEqual(expectedResponse, receivedResponse)
317
fda32c1c
RG
318 def testDrop(self):
319 """
320 Async: Drop
321 """
322 name = 'drop.async.tests.powerdns.com.'
323 query = dns.message.make_query(name, 'A', 'IN')
324
0a6676a4 325 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c 326 sender = getattr(self, method)
0a6676a4
RG
327 try:
328 (_, receivedResponse) = sender(query, response=None, useQueue=False)
e7000cce 329 except doqclient.StreamResetError:
0a6676a4 330 receivedResponse = None
fda32c1c
RG
331 self.assertEqual(receivedResponse, None)
332
fda32c1c
RG
333 def testCustom(self):
334 """
335 Async: Custom answer
336 """
337 name = 'custom.async.tests.powerdns.com.'
338 query = dns.message.make_query(name, 'A', 'IN')
339
340 expectedResponse = dns.message.make_response(query)
341 expectedResponse.flags |= dns.flags.RA
342 expectedResponse.set_rcode(dns.rcode.FORMERR)
343
0a6676a4 344 for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper", "sendDOQQueryWrapper"):
fda32c1c
RG
345 sender = getattr(self, method)
346 (_, receivedResponse) = sender(query, response=None, useQueue=False)
347 self.assertTrue(receivedResponse)
0a6676a4
RG
348 if method == 'sendDOQQueryWrapper':
349 # dnspython sets the ID to 0
350 receivedResponse.id = expectedResponse.id
fda32c1c
RG
351 self.assertEqual(expectedResponse, receivedResponse)
352
fda32c1c
RG
353 def testTruncation(self):
354 """
355 Async: DoH query, timeout then truncated answer over UDP, then valid over TCP and accept
356 """
357 # the query is first forwarded over UDP, leading to a TC=1 answer from the
358 # backend, then over TCP
fda32c1c 359
1cc232f9
RG
360 for method in ("sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper"):
361 sender = getattr(self, method)
362 name = 'timeout-then-accept.' + method + '.tc.async.tests.powerdns.com.'
363 query = dns.message.make_query(name, 'A', 'IN')
364 query.id = 42
365 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
366 expectedQuery.id = 42
367 response = dns.message.make_response(query)
368 rrset = dns.rrset.from_text(name,
369 3600,
370 dns.rdataclass.IN,
371 dns.rdatatype.A,
372 '127.0.0.1')
373 response.answer.append(rrset)
374
375 # first response is a TC=1
376 tcResponse = dns.message.make_response(query)
377 tcResponse.flags |= dns.flags.TC
378 self._toResponderQueue.put(tcResponse, True, 2.0)
45b91456 379
1cc232f9
RG
380 # first query, received by the responder over UDP
381 (receivedQuery, receivedResponse) = sender(query, response=response)
382 self.assertTrue(receivedQuery)
383 receivedQuery.id = expectedQuery.id
384 self.assertEqual(expectedQuery, receivedQuery)
385 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
45b91456 386
1cc232f9
RG
387 # check the response
388 self.assertTrue(receivedResponse)
389 self.assertEqual(response, receivedResponse)
45b91456 390
1cc232f9
RG
391 # check the second query, received by the responder over TCP
392 receivedQuery = self._fromResponderQueue.get(True, 2.0)
393 self.assertTrue(receivedQuery)
394 receivedQuery.id = expectedQuery.id
395 self.assertEqual(expectedQuery, receivedQuery)
396 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
397
398@unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
399class TestAsyncFFI(DNSDistTest, AsyncTests):
45b91456 400 _config_template = """
1cc232f9
RG
401 newServer{address="127.0.0.1:%d", pool={'', 'cache'}}
402 newServer{address="127.0.0.1:%d", pool="tcp-only", tcpOnly=true }
45b91456 403
1cc232f9
RG
404 addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="openssl" })
405 addDOHLocal("127.0.0.1:%d", "%s", "%s", {"/"}, {library="h2o"})
406 addDOHLocal("127.0.0.1:%d", "%s", "%s", {"/"}, {library="nghttp2"})
0a6676a4 407 addDOQLocal("127.0.0.1:%d", "%s", "%s")
45b91456
RG
408
409 local ffi = require("ffi")
410 local C = ffi.C
411
412 local filteringTagName = 'filtering'
413 local filteringTagValue = 'pass'
414 local asyncID = 0
415
416 pc = newPacketCache(100)
417 getPool('cache'):setCache(pc)
418
0a6676a4
RG
419 local asyncObjectsMap = {}
420
45b91456
RG
421 function gotAsyncResponse(endpointID, message, from)
422
423 print('Got async response '..message)
424 local parts = {}
425 for part in message:gmatch("%%S+") do table.insert(parts, part) end
426 if #parts ~= 2 then
427 print('Invalid message')
428 return
429 end
430 local queryID = tonumber(parts[1])
0a6676a4 431 local qname = asyncObjectsMap[queryID]
45b91456
RG
432 if parts[2] == 'accept' then
433 print('accepting')
434 C.dnsdist_ffi_resume_from_async(asyncID, queryID, filteringTagName, #filteringTagName, filteringTagValue, #filteringTagValue, true)
435 return
436 end
437 if parts[2] == 'refuse' then
438 print('refusing')
439 C.dnsdist_ffi_set_rcode_from_async(asyncID, queryID, DNSRCode.REFUSED, true)
440 return
441 end
442 if parts[2] == 'drop' then
443 print('dropping')
444 C.dnsdist_ffi_drop_from_async(asyncID, queryID)
445 return
446 end
447 if parts[2] == 'custom' then
448 print('sending a custom response')
0a6676a4
RG
449 local raw = nil
450 if qname == string.char(6)..'custom'..string.char(5)..'async'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com' then
451 raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\006custom\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
452 elseif qname == string.char(18)..'accept-then-custom'..string.char(5)..'async'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com' then
453 raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\018accept-then-custom\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
454 elseif qname == string.char(18)..'accept-then-custom'..string.char(8)..'tcp-only'..string.char(5)..'async'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com' then
455 raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\018accept-then-custom\\008tcp-only\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
456 end
457
45b91456
RG
458 C.dnsdist_ffi_set_answer_from_async(asyncID, queryID, raw, #raw)
459 return
460 end
461 end
462
473cc2df
RG
463 asyncResponderEndpoint = newNetworkEndpoint('%s')
464 listener = newNetworkListener()
45b91456
RG
465 listener:addUnixListeningEndpoint('%s', 0, gotAsyncResponse)
466 listener:start()
467
0a6676a4
RG
468 function getQNameRaw(dq)
469 local ret_ptr = ffi.new("char *[1]")
470 local ret_ptr_param = ffi.cast("const char **", ret_ptr)
471 local ret_size = ffi.new("size_t[1]")
472 local ret_size_param = ffi.cast("size_t*", ret_size)
473 C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
474 return ffi.string(ret_ptr[0])
475 end
476
45b91456
RG
477 function passQueryToAsyncFilter(dq)
478 print('in passQueryToAsyncFilter')
479 local timeout = 500 -- 500 ms
480
481 local queryPtr = C.dnsdist_ffi_dnsquestion_get_header(dq)
482 local querySize = C.dnsdist_ffi_dnsquestion_get_len(dq)
483
484 -- we need to take a copy, as we can no longer touch that data after calling set_async
485 local buffer = ffi.string(queryPtr, querySize)
486
0a6676a4
RG
487 asyncObjectsMap[C.dnsdist_ffi_dnsquestion_get_id(dq)] = getQNameRaw(dq)
488
489 C.dnsdist_ffi_dnsquestion_set_async(dq, asyncID, C.dnsdist_ffi_dnsquestion_get_id(dq), timeout)
45b91456
RG
490 asyncResponderEndpoint:send(buffer)
491
492 return DNSAction.Allow
493 end
494
495 function passResponseToAsyncFilter(dr)
496 print('in passResponseToAsyncFilter')
497 local timeout = 500 -- 500 ms
498
499 local responsePtr = C.dnsdist_ffi_dnsquestion_get_header(dr)
500 local responseSize = C.dnsdist_ffi_dnsquestion_get_len(dr)
501
502 -- we need to take a copy, as we can no longer touch that data after calling set_async
503 local buffer = ffi.string(responsePtr, responseSize)
504
0a6676a4
RG
505 asyncObjectsMap[C.dnsdist_ffi_dnsquestion_get_id(dr)] = getQNameRaw(dr)
506
507 C.dnsdist_ffi_dnsresponse_set_async(dr, asyncID, C.dnsdist_ffi_dnsquestion_get_id(dr), timeout)
45b91456
RG
508 asyncResponderEndpoint:send(buffer)
509
510 return DNSResponseAction.Allow
511 end
512
513 -- this only matters for tests actually reaching the backend
514 addAction('tcp-only.async.tests.powerdns.com', PoolAction('tcp-only', false))
515 addAction('cache.async.tests.powerdns.com', PoolAction('cache', false))
516 addAction(AllRule(), LuaFFIAction(passQueryToAsyncFilter))
517 addCacheHitResponseAction(AllRule(), LuaFFIResponseAction(passResponseToAsyncFilter))
518 addResponseAction(AllRule(), LuaFFIResponseAction(passResponseToAsyncFilter))
519 """
520 _asyncResponderSocketPath = asyncResponderSocketPath
521 _dnsdistSocketPath = dnsdistSocketPath
0a6676a4 522 _config_params = ['_testServerPort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohWithH2OServerPort', '_serverCert', '_serverKey', '_dohWithNGHTTP2ServerPort', '_serverCert', '_serverKey', '_doqServerPort', '_serverCert', '_serverKey', '_asyncResponderSocketPath', '_dnsdistSocketPath']
45b91456
RG
523 _verboseMode = True
524
525@unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
526class TestAsyncLua(DNSDistTest, AsyncTests):
45b91456 527 _config_template = """
1cc232f9
RG
528 newServer{address="127.0.0.1:%d", pool={'', 'cache'}}
529 newServer{address="127.0.0.1:%d", pool="tcp-only", tcpOnly=true }
45b91456 530
1cc232f9
RG
531 addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="openssl" })
532 addDOHLocal("127.0.0.1:%d", "%s", "%s", {"/"}, {library="h2o"})
533 addDOHLocal("127.0.0.1:%d", "%s", "%s", {"/"}, {library="nghttp2"})
0a6676a4 534 addDOQLocal("127.0.0.1:%d", "%s", "%s")
45b91456
RG
535
536 local filteringTagName = 'filtering'
537 local filteringTagValue = 'pass'
538 local asyncID = 0
539
540 pc = newPacketCache(100)
541 getPool('cache'):setCache(pc)
542
543 function gotAsyncResponse(endpointID, message, from)
544
545 print('Got async response '..message)
546 local parts = {}
547 for part in message:gmatch("%%S+") do
548 table.insert(parts, part)
549 end
550 if #parts ~= 2 then
551 print('Invalid message')
552 return
553 end
554 local queryID = tonumber(parts[1])
555 local asyncObject = getAsynchronousObject(asyncID, queryID)
556 if parts[2] == 'accept' then
557 print('accepting')
558 local dq = asyncObject:getDQ()
559 dq:setTag(filteringTagName, filteringTagValue)
560 asyncObject:resume()
561 return
562 end
563 if parts[2] == 'refuse' then
564 print('refusing')
565 local dq = asyncObject:getDQ()
566 asyncObject:setRCode(DNSRCode.REFUSED, true)
567 asyncObject:resume()
568 return
569 end
570 if parts[2] == 'drop' then
571 print('dropping')
572 asyncObject:drop()
573 return
574 end
575 if parts[2] == 'custom' then
576 print('sending a custom response')
45b91456 577 local dq = asyncObject:getDQ()
0a6676a4
RG
578 local raw
579 if tostring(dq.qname) == 'custom.async.tests.powerdns.com.' then
580 raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\006custom\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
581 elseif tostring(dq.qname) == 'accept-then-custom.async.tests.powerdns.com.' then
582 raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\018accept-then-custom\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
583 elseif tostring(dq.qname) == 'accept-then-custom.tcp-only.async.tests.powerdns.com.' then
584 raw = '\\000\\000\\128\\129\\000\\001\\000\\000\\000\\000\\000\\001\\018accept-then-custom\\008tcp-only\\005async\\005tests\\008powerdns\\003com\\000\\000\\001\\000\\001\\000\\000\\041\\002\\000\\000\\000\\128\\000\\000\\000'
585 end
45b91456
RG
586 dq:setContent(raw)
587 asyncObject:resume()
588 return
589 end
590 end
591
473cc2df
RG
592 asyncResponderEndpoint = newNetworkEndpoint('%s')
593 listener = newNetworkListener()
45b91456
RG
594 listener:addUnixListeningEndpoint('%s', 0, gotAsyncResponse)
595 listener:start()
596
597 function passQueryToAsyncFilter(dq)
598 print('in passQueryToAsyncFilter')
599 local timeout = 500 -- 500 ms
600
601 local buffer = dq:getContent()
602 local id = dq.dh:getID()
603 dq:suspend(asyncID, id, timeout)
604 asyncResponderEndpoint:send(buffer)
605
606 return DNSAction.Allow
607 end
608
609 function passResponseToAsyncFilter(dr)
610 print('in passResponseToAsyncFilter')
611 local timeout = 500 -- 500 ms
612
613 local buffer = dr:getContent()
614 local id = dr.dh:getID()
615 dr:suspend(asyncID, id, timeout)
616 asyncResponderEndpoint:send(buffer)
617
618 return DNSResponseAction.Allow
619 end
620
621 -- this only matters for tests actually reaching the backend
622 addAction('tcp-only.async.tests.powerdns.com', PoolAction('tcp-only', false))
623 addAction('cache.async.tests.powerdns.com', PoolAction('cache', false))
624 addAction(AllRule(), LuaAction(passQueryToAsyncFilter))
625 addCacheHitResponseAction(AllRule(), LuaResponseAction(passResponseToAsyncFilter))
626 addResponseAction(AllRule(), LuaResponseAction(passResponseToAsyncFilter))
627 """
628 _asyncResponderSocketPath = asyncResponderSocketPath
629 _dnsdistSocketPath = dnsdistSocketPath
0a6676a4 630 _config_params = ['_testServerPort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohWithH2OServerPort', '_serverCert', '_serverKey', '_dohWithNGHTTP2ServerPort', '_serverCert', '_serverKey', '_doqServerPort', '_serverCert', '_serverKey', '_asyncResponderSocketPath', '_dnsdistSocketPath']
45b91456 631 _verboseMode = True