]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #7941 from rgacogne/rec-records-from-gettag-ffi
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8 from io import BytesIO
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[]):
34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
36 response_headers = BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn.setopt(pycurl.URL, url)
39 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
40 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
41 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
42 conn.setopt(pycurl.HTTPHEADER, customHeaders)
43 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
44 if caFile:
45 conn.setopt(pycurl.CAINFO, caFile)
46
47 if response:
48 cls._toResponderQueue.put(response, True, timeout)
49
50 receivedQuery = None
51 message = None
52 cls._response_headers = ''
53 data = conn.perform_rb()
54 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
55 if cls._rcode == 200 and not rawResponse:
56 message = dns.message.from_wire(data)
57 elif rawResponse:
58 message = data
59
60 if useQueue and not cls._fromResponderQueue.empty():
61 receivedQuery = cls._fromResponderQueue.get(True, timeout)
62
63 cls._response_headers = response_headers.getvalue()
64 return (receivedQuery, message)
65
66 @classmethod
67 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[]):
68 url = baseurl
69 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
70 response_headers = BytesIO()
71 #conn.setopt(pycurl.VERBOSE, True)
72 conn.setopt(pycurl.URL, url)
73 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
74 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
75 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
76 conn.setopt(pycurl.HTTPHEADER, customHeaders)
77 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
78 conn.setopt(pycurl.POST, True)
79 data = query
80 if not rawQuery:
81 data = data.to_wire()
82
83 conn.setopt(pycurl.POSTFIELDS, data)
84
85 if caFile:
86 conn.setopt(pycurl.CAINFO, caFile)
87
88 if response:
89 cls._toResponderQueue.put(response, True, timeout)
90
91 receivedQuery = None
92 message = None
93 cls._response_headers = ''
94 data = conn.perform_rb()
95 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
96 if cls._rcode == 200 and not rawResponse:
97 message = dns.message.from_wire(data)
98 elif rawResponse:
99 message = data
100
101 if useQueue and not cls._fromResponderQueue.empty():
102 receivedQuery = cls._fromResponderQueue.get(True, timeout)
103
104 cls._response_headers = response_headers.getvalue()
105 return (receivedQuery, message)
106
107 # @classmethod
108 # def openDOHConnection(cls, port, caFile, timeout=2.0):
109 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
110 # sslctx.load_verify_locations(caFile)
111 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
112
113 # @classmethod
114 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
115 # url = cls.getDOHGetURL(baseurl, query)
116
117 # if response:
118 # cls._toResponderQueue.put(response, True, timeout)
119
120 # conn.request('GET', url)
121
122 # @classmethod
123 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
124 # message = None
125 # data = conn.get_response()
126 # if data:
127 # data = data.read()
128 # if data:
129 # message = dns.message.from_wire(data)
130
131 # if useQueue and not cls._fromResponderQueue.empty():
132 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
133 # return (receivedQuery, message)
134 # else:
135 # return message
136
137 class TestDOH(DNSDistDOHTest):
138
139 _serverKey = 'server.key'
140 _serverCert = 'server.chain'
141 _serverName = 'tls.tests.dnsdist.org'
142 _caCert = 'ca.pem'
143 _dohServerPort = 8443
144 _customResponseHeader1 = 'access-control-allow-origin: *'
145 _customResponseHeader2 = 'user-agent: derp'
146 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
147 _config_template = """
148 newServer{address="127.0.0.1:%s"}
149
150 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
151
152 addAction("drop.doh.tests.powerdns.com.", DropAction())
153 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
154 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
155 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
156 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
157 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
158 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
159 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
160
161 function dohHandler(dq)
162 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
163 local foundct = false
164 for key,value in pairs(dq:getHTTPHeaders()) do
165 if key == 'content-type' and value == 'application/dns-message' then
166 foundct = true
167 break
168 end
169 end
170 if foundct then
171 dq:setHTTPResponse(200, 'It works!', 'text/plain')
172 dq.dh:setQR(true)
173 return DNSAction.HeaderModify
174 end
175 end
176 return DNSAction.None
177 end
178 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
179 """
180 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
181
182 def testDOHSimple(self):
183 """
184 DOH: Simple query
185 """
186 name = 'simple.doh.tests.powerdns.com.'
187 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
188 query.id = 0
189 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
190 expectedQuery.id = 0
191 response = dns.message.make_response(query)
192 rrset = dns.rrset.from_text(name,
193 3600,
194 dns.rdataclass.IN,
195 dns.rdatatype.A,
196 '127.0.0.1')
197 response.answer.append(rrset)
198
199 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
200 self.assertTrue(receivedQuery)
201 self.assertTrue(receivedResponse)
202 receivedQuery.id = expectedQuery.id
203 self.assertEquals(expectedQuery, receivedQuery)
204 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
205 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
206 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
207 self.assertEquals(response, receivedResponse)
208
209 def testDOHSimplePOST(self):
210 """
211 DOH: Simple POST query
212 """
213 name = 'simple-post.doh.tests.powerdns.com.'
214 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
215 query.id = 0
216 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
217 expectedQuery.id = 0
218 response = dns.message.make_response(query)
219 rrset = dns.rrset.from_text(name,
220 3600,
221 dns.rdataclass.IN,
222 dns.rdatatype.A,
223 '127.0.0.1')
224 response.answer.append(rrset)
225
226 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
227 self.assertTrue(receivedQuery)
228 self.assertTrue(receivedResponse)
229 receivedQuery.id = expectedQuery.id
230 self.assertEquals(expectedQuery, receivedQuery)
231 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
232 self.assertEquals(response, receivedResponse)
233
234 def testDOHExistingEDNS(self):
235 """
236 DOH: Existing EDNS
237 """
238 name = 'existing-edns.doh.tests.powerdns.com.'
239 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
240 query.id = 0
241 response = dns.message.make_response(query)
242 rrset = dns.rrset.from_text(name,
243 3600,
244 dns.rdataclass.IN,
245 dns.rdatatype.A,
246 '127.0.0.1')
247 response.answer.append(rrset)
248
249 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
250 self.assertTrue(receivedQuery)
251 self.assertTrue(receivedResponse)
252 receivedQuery.id = query.id
253 self.assertEquals(query, receivedQuery)
254 self.assertEquals(response, receivedResponse)
255 self.checkQueryEDNSWithoutECS(query, receivedQuery)
256 self.checkResponseEDNSWithoutECS(response, receivedResponse)
257
258 def testDOHExistingECS(self):
259 """
260 DOH: Existing EDNS Client Subnet
261 """
262 name = 'existing-ecs.doh.tests.powerdns.com.'
263 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
264 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
265 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
266 query.id = 0
267 response = dns.message.make_response(query)
268 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
269 rrset = dns.rrset.from_text(name,
270 3600,
271 dns.rdataclass.IN,
272 dns.rdatatype.A,
273 '127.0.0.1')
274 response.answer.append(rrset)
275
276 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
277 self.assertTrue(receivedQuery)
278 self.assertTrue(receivedResponse)
279 receivedQuery.id = query.id
280 self.assertEquals(query, receivedQuery)
281 self.assertEquals(response, receivedResponse)
282 self.checkQueryEDNSWithECS(query, receivedQuery)
283 self.checkResponseEDNSWithECS(response, receivedResponse)
284
285 def testDropped(self):
286 """
287 DOH: Dropped query
288 """
289 name = 'drop.doh.tests.powerdns.com.'
290 query = dns.message.make_query(name, 'A', 'IN')
291 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
292 self.assertEquals(receivedResponse, None)
293
294 def testRefused(self):
295 """
296 DOH: Refused
297 """
298 name = 'refused.doh.tests.powerdns.com.'
299 query = dns.message.make_query(name, 'A', 'IN')
300 query.id = 0
301 expectedResponse = dns.message.make_response(query)
302 expectedResponse.set_rcode(dns.rcode.REFUSED)
303
304 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
305 self.assertEquals(receivedResponse, expectedResponse)
306
307 def testSpoof(self):
308 """
309 DOH: Spoofed
310 """
311 name = 'spoof.doh.tests.powerdns.com.'
312 query = dns.message.make_query(name, 'A', 'IN')
313 query.id = 0
314 query.flags &= ~dns.flags.RD
315 expectedResponse = dns.message.make_response(query)
316 rrset = dns.rrset.from_text(name,
317 3600,
318 dns.rdataclass.IN,
319 dns.rdatatype.A,
320 '1.2.3.4')
321 expectedResponse.answer.append(rrset)
322
323 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
324 self.assertEquals(receivedResponse, expectedResponse)
325
326 def testDOHInvalid(self):
327 """
328 DOH: Invalid query
329 """
330 name = 'invalid.doh.tests.powerdns.com.'
331 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
332 invalidQuery.id = 0
333 # first an invalid query
334 invalidQuery = invalidQuery.to_wire()
335 invalidQuery = invalidQuery[:-5]
336 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
337 self.assertEquals(receivedResponse, None)
338
339 # and now a valid one
340 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
341 query.id = 0
342 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
343 expectedQuery.id = 0
344 response = dns.message.make_response(query)
345 rrset = dns.rrset.from_text(name,
346 3600,
347 dns.rdataclass.IN,
348 dns.rdatatype.A,
349 '127.0.0.1')
350 response.answer.append(rrset)
351 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
352 self.assertTrue(receivedQuery)
353 self.assertTrue(receivedResponse)
354 receivedQuery.id = expectedQuery.id
355 self.assertEquals(expectedQuery, receivedQuery)
356 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
357 self.assertEquals(response, receivedResponse)
358
359 def testDOHWithoutQuery(self):
360 """
361 DOH: Empty GET query
362 """
363 name = 'empty-get.doh.tests.powerdns.com.'
364 url = self._dohBaseURL
365 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
366 conn.setopt(pycurl.URL, url)
367 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
368 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
369 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
370 conn.setopt(pycurl.CAINFO, self._caCert)
371 data = conn.perform_rb()
372 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
373 self.assertEquals(rcode, 400)
374
375 def testDOHEmptyPOST(self):
376 """
377 DOH: Empty POST query
378 """
379 name = 'empty-post.doh.tests.powerdns.com.'
380
381 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
382 self.assertEquals(receivedResponse, None)
383
384 # and now a valid one
385 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
386 query.id = 0
387 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
388 expectedQuery.id = 0
389 response = dns.message.make_response(query)
390 rrset = dns.rrset.from_text(name,
391 3600,
392 dns.rdataclass.IN,
393 dns.rdatatype.A,
394 '127.0.0.1')
395 response.answer.append(rrset)
396 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
397 self.assertTrue(receivedQuery)
398 self.assertTrue(receivedResponse)
399 receivedQuery.id = expectedQuery.id
400 self.assertEquals(expectedQuery, receivedQuery)
401 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
402 self.assertEquals(response, receivedResponse)
403
404 def testHeaderRule(self):
405 """
406 DOH: HeaderRule
407 """
408 name = 'header-rule.doh.tests.powerdns.com.'
409 query = dns.message.make_query(name, 'A', 'IN')
410 query.id = 0
411 query.flags &= ~dns.flags.RD
412 expectedResponse = dns.message.make_response(query)
413 rrset = dns.rrset.from_text(name,
414 3600,
415 dns.rdataclass.IN,
416 dns.rdatatype.A,
417 '2.3.4.5')
418 expectedResponse.answer.append(rrset)
419
420 # this header should match
421 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
422 self.assertEquals(receivedResponse, expectedResponse)
423
424 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
425 expectedQuery.flags &= ~dns.flags.RD
426 expectedQuery.id = 0
427 response = dns.message.make_response(query)
428 rrset = dns.rrset.from_text(name,
429 3600,
430 dns.rdataclass.IN,
431 dns.rdatatype.A,
432 '127.0.0.1')
433 response.answer.append(rrset)
434
435 # this content of the header should NOT match
436 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
437 self.assertTrue(receivedQuery)
438 self.assertTrue(receivedResponse)
439 receivedQuery.id = expectedQuery.id
440 self.assertEquals(expectedQuery, receivedQuery)
441 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
442 self.assertEquals(response, receivedResponse)
443
444 def testHTTPPath(self):
445 """
446 DOH: HTTPPath
447 """
448 name = 'http-path.doh.tests.powerdns.com.'
449 query = dns.message.make_query(name, 'A', 'IN')
450 query.id = 0
451 query.flags &= ~dns.flags.RD
452 expectedResponse = dns.message.make_response(query)
453 rrset = dns.rrset.from_text(name,
454 3600,
455 dns.rdataclass.IN,
456 dns.rdatatype.A,
457 '3.4.5.6')
458 expectedResponse.answer.append(rrset)
459
460 # this path should match
461 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
462 self.assertEquals(receivedResponse, expectedResponse)
463
464 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
465 expectedQuery.id = 0
466 expectedQuery.flags &= ~dns.flags.RD
467 response = dns.message.make_response(query)
468 rrset = dns.rrset.from_text(name,
469 3600,
470 dns.rdataclass.IN,
471 dns.rdatatype.A,
472 '127.0.0.1')
473 response.answer.append(rrset)
474
475 # this path should NOT match
476 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
477 self.assertTrue(receivedQuery)
478 self.assertTrue(receivedResponse)
479 receivedQuery.id = expectedQuery.id
480 self.assertEquals(expectedQuery, receivedQuery)
481 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
482 self.assertEquals(response, receivedResponse)
483
484 def testHTTPPathRegex(self):
485 """
486 DOH: HTTPPathRegex
487 """
488 name = 'http-path-regex.doh.tests.powerdns.com.'
489 query = dns.message.make_query(name, 'A', 'IN')
490 query.id = 0
491 query.flags &= ~dns.flags.RD
492 expectedResponse = dns.message.make_response(query)
493 rrset = dns.rrset.from_text(name,
494 3600,
495 dns.rdataclass.IN,
496 dns.rdatatype.A,
497 '6.7.8.9')
498 expectedResponse.answer.append(rrset)
499
500 # this path should match
501 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS-999', caFile=self._caCert, query=query, response=None, useQueue=False)
502 self.assertEquals(receivedResponse, expectedResponse)
503
504 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
505 expectedQuery.id = 0
506 expectedQuery.flags &= ~dns.flags.RD
507 response = dns.message.make_response(query)
508 rrset = dns.rrset.from_text(name,
509 3600,
510 dns.rdataclass.IN,
511 dns.rdatatype.A,
512 '127.0.0.1')
513 response.answer.append(rrset)
514
515 # this path should NOT match
516 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
517 self.assertTrue(receivedQuery)
518 self.assertTrue(receivedResponse)
519 receivedQuery.id = expectedQuery.id
520 self.assertEquals(expectedQuery, receivedQuery)
521 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
522 self.assertEquals(response, receivedResponse)
523
524 def testHTTPStatusAction200(self):
525 """
526 DOH: HTTPStatusAction 200 OK
527 """
528 name = 'http-status-action.doh.tests.powerdns.com.'
529 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
530 query.id = 0
531
532 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
533 self.assertTrue(receivedResponse)
534 self.assertEquals(receivedResponse, b'Plaintext answer')
535 self.assertEquals(self._rcode, 200)
536 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
537
538 def testHTTPStatusAction307(self):
539 """
540 DOH: HTTPStatusAction 307
541 """
542 name = 'http-status-action-redirect.doh.tests.powerdns.com.'
543 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
544 query.id = 0
545
546 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
547 self.assertTrue(receivedResponse)
548 self.assertEquals(self._rcode, 307)
549 self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
550
551 def testHTTPLuaResponse(self):
552 """
553 DOH: Lua HTTP Response
554 """
555 name = 'http-lua.doh.tests.powerdns.com.'
556 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
557 query.id = 0
558
559 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
560 self.assertTrue(receivedResponse)
561 self.assertEquals(receivedResponse, b'It works!')
562 self.assertEquals(self._rcode, 200)
563 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
564
565 class TestDOHAddingECS(DNSDistDOHTest):
566
567 _serverKey = 'server.key'
568 _serverCert = 'server.chain'
569 _serverName = 'tls.tests.dnsdist.org'
570 _caCert = 'ca.pem'
571 _dohServerPort = 8443
572 _serverName = 'tls.tests.dnsdist.org'
573 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
574 _config_template = """
575 newServer{address="127.0.0.1:%s", useClientSubnet=true}
576 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
577 setECSOverride(true)
578 """
579 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
580
581 def testDOHSimple(self):
582 """
583 DOH with ECS: Simple query
584 """
585 name = 'simple.doh-ecs.tests.powerdns.com.'
586 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
587 query.id = 0
588 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
589 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
590 response = dns.message.make_response(query)
591 rrset = dns.rrset.from_text(name,
592 3600,
593 dns.rdataclass.IN,
594 dns.rdatatype.A,
595 '127.0.0.1')
596 response.answer.append(rrset)
597
598 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
599 self.assertTrue(receivedQuery)
600 self.assertTrue(receivedResponse)
601 expectedQuery.id = receivedQuery.id
602 self.assertEquals(expectedQuery, receivedQuery)
603 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
604 self.assertEquals(response, receivedResponse)
605 self.checkResponseNoEDNS(response, receivedResponse)
606
607 def testDOHExistingEDNS(self):
608 """
609 DOH with ECS: Existing EDNS
610 """
611 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
612 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
613 query.id = 0
614 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
615 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
616 response = dns.message.make_response(query)
617 rrset = dns.rrset.from_text(name,
618 3600,
619 dns.rdataclass.IN,
620 dns.rdatatype.A,
621 '127.0.0.1')
622 response.answer.append(rrset)
623
624 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
625 self.assertTrue(receivedQuery)
626 self.assertTrue(receivedResponse)
627 receivedQuery.id = expectedQuery.id
628 self.assertEquals(expectedQuery, receivedQuery)
629 self.assertEquals(response, receivedResponse)
630 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
631 self.checkResponseEDNSWithoutECS(response, receivedResponse)
632
633 def testDOHExistingECS(self):
634 """
635 DOH with ECS: Existing EDNS Client Subnet
636 """
637 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
638 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
639 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
640 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
641 query.id = 0
642 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
643 response = dns.message.make_response(query)
644 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
645 rrset = dns.rrset.from_text(name,
646 3600,
647 dns.rdataclass.IN,
648 dns.rdatatype.A,
649 '127.0.0.1')
650 response.answer.append(rrset)
651
652 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
653 self.assertTrue(receivedQuery)
654 self.assertTrue(receivedResponse)
655 receivedQuery.id = expectedQuery.id
656 self.assertEquals(expectedQuery, receivedQuery)
657 self.assertEquals(response, receivedResponse)
658 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
659 self.checkResponseEDNSWithECS(response, receivedResponse)