]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #8345 from omoerbeek/format-code-script
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import os
5 import unittest
6 import clientsubnetoption
7 from dnsdisttests import DNSDistTest
8
9 import pycurl
10 from io import BytesIO
11 #from hyper import HTTP20Connection
12 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
13
14 @unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
15 class DNSDistDOHTest(DNSDistTest):
16
17 @classmethod
18 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
19 if rawQuery:
20 wire = query
21 else:
22 wire = query.to_wire()
23 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
24 return baseurl + "?dns=" + param
25
26 @classmethod
27 def openDOHConnection(cls, port, caFile, timeout=2.0):
28 conn = pycurl.Curl()
29 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
30
31 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
32 "Accept: application/dns-message"])
33 return conn
34
35 @classmethod
36 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
37 url = cls.getDOHGetURL(baseurl, query, rawQuery)
38 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
39 response_headers = BytesIO()
40 #conn.setopt(pycurl.VERBOSE, True)
41 conn.setopt(pycurl.URL, url)
42 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
43 if useHTTPS:
44 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
45 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
46 if caFile:
47 conn.setopt(pycurl.CAINFO, caFile)
48
49 conn.setopt(pycurl.HTTPHEADER, customHeaders)
50 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
51
52 if response:
53 cls._toResponderQueue.put(response, True, timeout)
54
55 receivedQuery = None
56 message = None
57 cls._response_headers = ''
58 data = conn.perform_rb()
59 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
60 if cls._rcode == 200 and not rawResponse:
61 message = dns.message.from_wire(data)
62 elif rawResponse:
63 message = data
64
65 if useQueue and not cls._fromResponderQueue.empty():
66 receivedQuery = cls._fromResponderQueue.get(True, timeout)
67
68 cls._response_headers = response_headers.getvalue()
69 return (receivedQuery, message)
70
71 @classmethod
72 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
73 url = baseurl
74 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
75 response_headers = BytesIO()
76 #conn.setopt(pycurl.VERBOSE, True)
77 conn.setopt(pycurl.URL, url)
78 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
79 if useHTTPS:
80 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
81 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
82 if caFile:
83 conn.setopt(pycurl.CAINFO, caFile)
84
85 conn.setopt(pycurl.HTTPHEADER, customHeaders)
86 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
87 conn.setopt(pycurl.POST, True)
88 data = query
89 if not rawQuery:
90 data = data.to_wire()
91
92 conn.setopt(pycurl.POSTFIELDS, data)
93
94 if response:
95 cls._toResponderQueue.put(response, True, timeout)
96
97 receivedQuery = None
98 message = None
99 cls._response_headers = ''
100 data = conn.perform_rb()
101 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
102 if cls._rcode == 200 and not rawResponse:
103 message = dns.message.from_wire(data)
104 elif rawResponse:
105 message = data
106
107 if useQueue and not cls._fromResponderQueue.empty():
108 receivedQuery = cls._fromResponderQueue.get(True, timeout)
109
110 cls._response_headers = response_headers.getvalue()
111 return (receivedQuery, message)
112
113 @classmethod
114 def setUpClass(cls):
115
116 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
117 if 'SKIP_DOH_TESTS' in os.environ:
118 raise unittest.SkipTest('DNS over HTTPS tests are disabled')
119
120 cls.startResponders()
121 cls.startDNSDist()
122 cls.setUpSockets()
123
124 print("Launching tests..")
125
126 # @classmethod
127 # def openDOHConnection(cls, port, caFile, timeout=2.0):
128 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
129 # sslctx.load_verify_locations(caFile)
130 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
131
132 # @classmethod
133 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
134 # url = cls.getDOHGetURL(baseurl, query)
135
136 # if response:
137 # cls._toResponderQueue.put(response, True, timeout)
138
139 # conn.request('GET', url)
140
141 # @classmethod
142 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
143 # message = None
144 # data = conn.get_response()
145 # if data:
146 # data = data.read()
147 # if data:
148 # message = dns.message.from_wire(data)
149
150 # if useQueue and not cls._fromResponderQueue.empty():
151 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
152 # return (receivedQuery, message)
153 # else:
154 # return message
155
156 class TestDOH(DNSDistDOHTest):
157
158 _serverKey = 'server.key'
159 _serverCert = 'server.chain'
160 _serverName = 'tls.tests.dnsdist.org'
161 _caCert = 'ca.pem'
162 _dohServerPort = 8443
163 _customResponseHeader1 = 'access-control-allow-origin: *'
164 _customResponseHeader2 = 'user-agent: derp'
165 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
166 _config_template = """
167 newServer{address="127.0.0.1:%s"}
168
169 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
170 dohFE = getDOHFrontend(0)
171 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
172
173 addAction("drop.doh.tests.powerdns.com.", DropAction())
174 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
175 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
176 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
177 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
178 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
179 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
180 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
181
182 function dohHandler(dq)
183 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
184 local foundct = false
185 for key,value in pairs(dq:getHTTPHeaders()) do
186 if key == 'content-type' and value == 'application/dns-message' then
187 foundct = true
188 break
189 end
190 end
191 if foundct then
192 dq:setHTTPResponse(200, 'It works!', 'text/plain')
193 dq.dh:setQR(true)
194 return DNSAction.HeaderModify
195 end
196 end
197 return DNSAction.None
198 end
199 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
200 """
201 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
202
203 def testDOHSimple(self):
204 """
205 DOH: Simple query
206 """
207 name = 'simple.doh.tests.powerdns.com.'
208 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
209 query.id = 0
210 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
211 expectedQuery.id = 0
212 response = dns.message.make_response(query)
213 rrset = dns.rrset.from_text(name,
214 3600,
215 dns.rdataclass.IN,
216 dns.rdatatype.A,
217 '127.0.0.1')
218 response.answer.append(rrset)
219
220 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
221 self.assertTrue(receivedQuery)
222 self.assertTrue(receivedResponse)
223 receivedQuery.id = expectedQuery.id
224 self.assertEquals(expectedQuery, receivedQuery)
225 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
226 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
227 self.assertFalse(('UPPERCASE: VaLuE' in self._response_headers.decode()))
228 self.assertTrue(('uppercase: VaLuE' in self._response_headers.decode()))
229 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
230 self.assertEquals(response, receivedResponse)
231
232 def testDOHSimplePOST(self):
233 """
234 DOH: Simple POST query
235 """
236 name = 'simple-post.doh.tests.powerdns.com.'
237 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
238 query.id = 0
239 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
240 expectedQuery.id = 0
241 response = dns.message.make_response(query)
242 rrset = dns.rrset.from_text(name,
243 3600,
244 dns.rdataclass.IN,
245 dns.rdatatype.A,
246 '127.0.0.1')
247 response.answer.append(rrset)
248
249 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
250 self.assertTrue(receivedQuery)
251 self.assertTrue(receivedResponse)
252 receivedQuery.id = expectedQuery.id
253 self.assertEquals(expectedQuery, receivedQuery)
254 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
255 self.assertEquals(response, receivedResponse)
256
257 def testDOHExistingEDNS(self):
258 """
259 DOH: Existing EDNS
260 """
261 name = 'existing-edns.doh.tests.powerdns.com.'
262 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
263 query.id = 0
264 response = dns.message.make_response(query)
265 rrset = dns.rrset.from_text(name,
266 3600,
267 dns.rdataclass.IN,
268 dns.rdatatype.A,
269 '127.0.0.1')
270 response.answer.append(rrset)
271
272 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
273 self.assertTrue(receivedQuery)
274 self.assertTrue(receivedResponse)
275 receivedQuery.id = query.id
276 self.assertEquals(query, receivedQuery)
277 self.assertEquals(response, receivedResponse)
278 self.checkQueryEDNSWithoutECS(query, receivedQuery)
279 self.checkResponseEDNSWithoutECS(response, receivedResponse)
280
281 def testDOHExistingECS(self):
282 """
283 DOH: Existing EDNS Client Subnet
284 """
285 name = 'existing-ecs.doh.tests.powerdns.com.'
286 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
287 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
288 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
289 query.id = 0
290 response = dns.message.make_response(query)
291 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
292 rrset = dns.rrset.from_text(name,
293 3600,
294 dns.rdataclass.IN,
295 dns.rdatatype.A,
296 '127.0.0.1')
297 response.answer.append(rrset)
298
299 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
300 self.assertTrue(receivedQuery)
301 self.assertTrue(receivedResponse)
302 receivedQuery.id = query.id
303 self.assertEquals(query, receivedQuery)
304 self.assertEquals(response, receivedResponse)
305 self.checkQueryEDNSWithECS(query, receivedQuery)
306 self.checkResponseEDNSWithECS(response, receivedResponse)
307
308 def testDropped(self):
309 """
310 DOH: Dropped query
311 """
312 name = 'drop.doh.tests.powerdns.com.'
313 query = dns.message.make_query(name, 'A', 'IN')
314 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
315 self.assertEquals(receivedResponse, None)
316
317 def testRefused(self):
318 """
319 DOH: Refused
320 """
321 name = 'refused.doh.tests.powerdns.com.'
322 query = dns.message.make_query(name, 'A', 'IN')
323 query.id = 0
324 expectedResponse = dns.message.make_response(query)
325 expectedResponse.set_rcode(dns.rcode.REFUSED)
326
327 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
328 self.assertEquals(receivedResponse, expectedResponse)
329
330 def testSpoof(self):
331 """
332 DOH: Spoofed
333 """
334 name = 'spoof.doh.tests.powerdns.com.'
335 query = dns.message.make_query(name, 'A', 'IN')
336 query.id = 0
337 query.flags &= ~dns.flags.RD
338 expectedResponse = dns.message.make_response(query)
339 rrset = dns.rrset.from_text(name,
340 3600,
341 dns.rdataclass.IN,
342 dns.rdatatype.A,
343 '1.2.3.4')
344 expectedResponse.answer.append(rrset)
345
346 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
347 self.assertEquals(receivedResponse, expectedResponse)
348
349 def testDOHInvalid(self):
350 """
351 DOH: Invalid query
352 """
353 name = 'invalid.doh.tests.powerdns.com.'
354 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
355 invalidQuery.id = 0
356 # first an invalid query
357 invalidQuery = invalidQuery.to_wire()
358 invalidQuery = invalidQuery[:-5]
359 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
360 self.assertEquals(receivedResponse, None)
361
362 # and now a valid one
363 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
364 query.id = 0
365 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
366 expectedQuery.id = 0
367 response = dns.message.make_response(query)
368 rrset = dns.rrset.from_text(name,
369 3600,
370 dns.rdataclass.IN,
371 dns.rdatatype.A,
372 '127.0.0.1')
373 response.answer.append(rrset)
374 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
375 self.assertTrue(receivedQuery)
376 self.assertTrue(receivedResponse)
377 receivedQuery.id = expectedQuery.id
378 self.assertEquals(expectedQuery, receivedQuery)
379 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
380 self.assertEquals(response, receivedResponse)
381
382 def testDOHWithoutQuery(self):
383 """
384 DOH: Empty GET query
385 """
386 name = 'empty-get.doh.tests.powerdns.com.'
387 url = self._dohBaseURL
388 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
389 conn.setopt(pycurl.URL, url)
390 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
391 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
392 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
393 conn.setopt(pycurl.CAINFO, self._caCert)
394 data = conn.perform_rb()
395 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
396 self.assertEquals(rcode, 400)
397
398 def testDOHEmptyPOST(self):
399 """
400 DOH: Empty POST query
401 """
402 name = 'empty-post.doh.tests.powerdns.com.'
403
404 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
405 self.assertEquals(receivedResponse, None)
406
407 # and now a valid one
408 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
409 query.id = 0
410 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
411 expectedQuery.id = 0
412 response = dns.message.make_response(query)
413 rrset = dns.rrset.from_text(name,
414 3600,
415 dns.rdataclass.IN,
416 dns.rdatatype.A,
417 '127.0.0.1')
418 response.answer.append(rrset)
419 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
420 self.assertTrue(receivedQuery)
421 self.assertTrue(receivedResponse)
422 receivedQuery.id = expectedQuery.id
423 self.assertEquals(expectedQuery, receivedQuery)
424 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
425 self.assertEquals(response, receivedResponse)
426
427 def testHeaderRule(self):
428 """
429 DOH: HeaderRule
430 """
431 name = 'header-rule.doh.tests.powerdns.com.'
432 query = dns.message.make_query(name, 'A', 'IN')
433 query.id = 0
434 query.flags &= ~dns.flags.RD
435 expectedResponse = dns.message.make_response(query)
436 rrset = dns.rrset.from_text(name,
437 3600,
438 dns.rdataclass.IN,
439 dns.rdatatype.A,
440 '2.3.4.5')
441 expectedResponse.answer.append(rrset)
442
443 # this header should match
444 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
445 self.assertEquals(receivedResponse, expectedResponse)
446
447 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
448 expectedQuery.flags &= ~dns.flags.RD
449 expectedQuery.id = 0
450 response = dns.message.make_response(query)
451 rrset = dns.rrset.from_text(name,
452 3600,
453 dns.rdataclass.IN,
454 dns.rdatatype.A,
455 '127.0.0.1')
456 response.answer.append(rrset)
457
458 # this content of the header should NOT match
459 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
460 self.assertTrue(receivedQuery)
461 self.assertTrue(receivedResponse)
462 receivedQuery.id = expectedQuery.id
463 self.assertEquals(expectedQuery, receivedQuery)
464 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
465 self.assertEquals(response, receivedResponse)
466
467 def testHTTPPath(self):
468 """
469 DOH: HTTPPath
470 """
471 name = 'http-path.doh.tests.powerdns.com.'
472 query = dns.message.make_query(name, 'A', 'IN')
473 query.id = 0
474 query.flags &= ~dns.flags.RD
475 expectedResponse = dns.message.make_response(query)
476 rrset = dns.rrset.from_text(name,
477 3600,
478 dns.rdataclass.IN,
479 dns.rdatatype.A,
480 '3.4.5.6')
481 expectedResponse.answer.append(rrset)
482
483 # this path should match
484 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
485 self.assertEquals(receivedResponse, expectedResponse)
486
487 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
488 expectedQuery.id = 0
489 expectedQuery.flags &= ~dns.flags.RD
490 response = dns.message.make_response(query)
491 rrset = dns.rrset.from_text(name,
492 3600,
493 dns.rdataclass.IN,
494 dns.rdatatype.A,
495 '127.0.0.1')
496 response.answer.append(rrset)
497
498 # this path should NOT match
499 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
500 self.assertTrue(receivedQuery)
501 self.assertTrue(receivedResponse)
502 receivedQuery.id = expectedQuery.id
503 self.assertEquals(expectedQuery, receivedQuery)
504 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
505 self.assertEquals(response, receivedResponse)
506
507 def testHTTPPathRegex(self):
508 """
509 DOH: HTTPPathRegex
510 """
511 name = 'http-path-regex.doh.tests.powerdns.com.'
512 query = dns.message.make_query(name, 'A', 'IN')
513 query.id = 0
514 query.flags &= ~dns.flags.RD
515 expectedResponse = dns.message.make_response(query)
516 rrset = dns.rrset.from_text(name,
517 3600,
518 dns.rdataclass.IN,
519 dns.rdatatype.A,
520 '6.7.8.9')
521 expectedResponse.answer.append(rrset)
522
523 # this path should match
524 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS-999', caFile=self._caCert, query=query, response=None, useQueue=False)
525 self.assertEquals(receivedResponse, expectedResponse)
526
527 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
528 expectedQuery.id = 0
529 expectedQuery.flags &= ~dns.flags.RD
530 response = dns.message.make_response(query)
531 rrset = dns.rrset.from_text(name,
532 3600,
533 dns.rdataclass.IN,
534 dns.rdatatype.A,
535 '127.0.0.1')
536 response.answer.append(rrset)
537
538 # this path should NOT match
539 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
540 self.assertTrue(receivedQuery)
541 self.assertTrue(receivedResponse)
542 receivedQuery.id = expectedQuery.id
543 self.assertEquals(expectedQuery, receivedQuery)
544 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
545 self.assertEquals(response, receivedResponse)
546
547 def testHTTPStatusAction200(self):
548 """
549 DOH: HTTPStatusAction 200 OK
550 """
551 name = 'http-status-action.doh.tests.powerdns.com.'
552 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
553 query.id = 0
554
555 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
556 self.assertTrue(receivedResponse)
557 self.assertEquals(receivedResponse, b'Plaintext answer')
558 self.assertEquals(self._rcode, 200)
559 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
560
561 def testHTTPStatusAction307(self):
562 """
563 DOH: HTTPStatusAction 307
564 """
565 name = 'http-status-action-redirect.doh.tests.powerdns.com.'
566 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
567 query.id = 0
568
569 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
570 self.assertTrue(receivedResponse)
571 self.assertEquals(self._rcode, 307)
572 self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
573
574 def testHTTPLuaResponse(self):
575 """
576 DOH: Lua HTTP Response
577 """
578 name = 'http-lua.doh.tests.powerdns.com.'
579 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
580 query.id = 0
581
582 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
583 self.assertTrue(receivedResponse)
584 self.assertEquals(receivedResponse, b'It works!')
585 self.assertEquals(self._rcode, 200)
586 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
587
588 def testHTTPEarlyResponse(self):
589 """
590 DOH: HTTP Early Response
591 """
592 response_headers = BytesIO()
593 url = self._dohBaseURL + 'coffee'
594 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
595 conn.setopt(pycurl.URL, url)
596 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
597 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
598 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
599 conn.setopt(pycurl.CAINFO, self._caCert)
600 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
601 data = conn.perform_rb()
602 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
603 headers = response_headers.getvalue().decode()
604
605 self.assertEquals(rcode, 418)
606 self.assertEquals(data, b'C0FFEE')
607 self.assertIn('foo: bar', headers)
608 self.assertNotIn(self._customResponseHeader2, headers)
609
610 response_headers = BytesIO()
611 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
612 conn.setopt(pycurl.URL, url)
613 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
614 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
615 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
616 conn.setopt(pycurl.CAINFO, self._caCert)
617 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
618 conn.setopt(pycurl.POST, True)
619 data = ''
620 conn.setopt(pycurl.POSTFIELDS, data)
621
622 data = conn.perform_rb()
623 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
624 headers = response_headers.getvalue().decode()
625 self.assertEquals(rcode, 418)
626 self.assertEquals(data, b'C0FFEE')
627 self.assertIn('foo: bar', headers)
628 self.assertNotIn(self._customResponseHeader2, headers)
629
630 class TestDOHAddingECS(DNSDistDOHTest):
631
632 _serverKey = 'server.key'
633 _serverCert = 'server.chain'
634 _serverName = 'tls.tests.dnsdist.org'
635 _caCert = 'ca.pem'
636 _dohServerPort = 8443
637 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
638 _config_template = """
639 newServer{address="127.0.0.1:%s", useClientSubnet=true}
640 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
641 setECSOverride(true)
642 """
643 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
644
645 def testDOHSimple(self):
646 """
647 DOH with ECS: Simple query
648 """
649 name = 'simple.doh-ecs.tests.powerdns.com.'
650 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
651 query.id = 0
652 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
653 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
654 response = dns.message.make_response(query)
655 rrset = dns.rrset.from_text(name,
656 3600,
657 dns.rdataclass.IN,
658 dns.rdatatype.A,
659 '127.0.0.1')
660 response.answer.append(rrset)
661
662 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
663 self.assertTrue(receivedQuery)
664 self.assertTrue(receivedResponse)
665 expectedQuery.id = receivedQuery.id
666 self.assertEquals(expectedQuery, receivedQuery)
667 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
668 self.assertEquals(response, receivedResponse)
669 self.checkResponseNoEDNS(response, receivedResponse)
670
671 def testDOHExistingEDNS(self):
672 """
673 DOH with ECS: Existing EDNS
674 """
675 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
676 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
677 query.id = 0
678 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
679 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
680 response = dns.message.make_response(query)
681 rrset = dns.rrset.from_text(name,
682 3600,
683 dns.rdataclass.IN,
684 dns.rdatatype.A,
685 '127.0.0.1')
686 response.answer.append(rrset)
687
688 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
689 self.assertTrue(receivedQuery)
690 self.assertTrue(receivedResponse)
691 receivedQuery.id = expectedQuery.id
692 self.assertEquals(expectedQuery, receivedQuery)
693 self.assertEquals(response, receivedResponse)
694 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
695 self.checkResponseEDNSWithoutECS(response, receivedResponse)
696
697 def testDOHExistingECS(self):
698 """
699 DOH with ECS: Existing EDNS Client Subnet
700 """
701 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
702 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
703 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
704 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
705 query.id = 0
706 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
707 response = dns.message.make_response(query)
708 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
709 rrset = dns.rrset.from_text(name,
710 3600,
711 dns.rdataclass.IN,
712 dns.rdatatype.A,
713 '127.0.0.1')
714 response.answer.append(rrset)
715
716 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
717 self.assertTrue(receivedQuery)
718 self.assertTrue(receivedResponse)
719 receivedQuery.id = expectedQuery.id
720 self.assertEquals(expectedQuery, receivedQuery)
721 self.assertEquals(response, receivedResponse)
722 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
723 self.checkResponseEDNSWithECS(response, receivedResponse)
724
725 class TestDOHOverHTTP(DNSDistDOHTest):
726
727 _dohServerPort = 8480
728 _serverName = 'tls.tests.dnsdist.org'
729 _dohBaseURL = ("http://%s:%d/" % (_serverName, _dohServerPort))
730 _config_template = """
731 newServer{address="127.0.0.1:%s"}
732 addDOHLocal("127.0.0.1:%s")
733 """
734 _config_params = ['_testServerPort', '_dohServerPort']
735
736 def testDOHSimple(self):
737 """
738 DOH over HTTP: Simple query
739 """
740 name = 'simple.doh-over-http.tests.powerdns.com.'
741 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
742 query.id = 0
743 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
744 response = dns.message.make_response(query)
745 rrset = dns.rrset.from_text(name,
746 3600,
747 dns.rdataclass.IN,
748 dns.rdatatype.A,
749 '127.0.0.1')
750 response.answer.append(rrset)
751
752 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
753 self.assertTrue(receivedQuery)
754 self.assertTrue(receivedResponse)
755 expectedQuery.id = receivedQuery.id
756 self.assertEquals(expectedQuery, receivedQuery)
757 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
758 self.assertEquals(response, receivedResponse)
759 self.checkResponseNoEDNS(response, receivedResponse)
760
761 def testDOHSimplePOST(self):
762 """
763 DOH over HTTP: Simple POST query
764 """
765 name = 'simple-post.doh-over-http.tests.powerdns.com.'
766 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
767 query.id = 0
768 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
769 expectedQuery.id = 0
770 response = dns.message.make_response(query)
771 rrset = dns.rrset.from_text(name,
772 3600,
773 dns.rdataclass.IN,
774 dns.rdatatype.A,
775 '127.0.0.1')
776 response.answer.append(rrset)
777
778 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
779 self.assertTrue(receivedQuery)
780 self.assertTrue(receivedResponse)
781 receivedQuery.id = expectedQuery.id
782 self.assertEquals(expectedQuery, receivedQuery)
783 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
784 self.assertEquals(response, receivedResponse)
785 self.checkResponseNoEDNS(response, receivedResponse)
786
787 class TestDOHWithCache(DNSDistDOHTest):
788
789 _serverKey = 'server.key'
790 _serverCert = 'server.chain'
791 _serverName = 'tls.tests.dnsdist.org'
792 _caCert = 'ca.pem'
793 _dohServerPort = 8443
794 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
795 _config_template = """
796 newServer{address="127.0.0.1:%s"}
797
798 addDOHLocal("127.0.0.1:%s", "%s", "%s")
799
800 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
801 getPool(""):setCache(pc)
802 """
803 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
804
805 def testDOHCacheLargeAnswer(self):
806 """
807 DOH with cache: Check that we can cache (and retrieve) large answers
808 """
809 numberOfQueries = 10
810 name = 'large.doh-with-cache.tests.powerdns.com.'
811 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
812 query.id = 0
813 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
814 expectedQuery.id = 0
815 response = dns.message.make_response(query)
816 # we prepare a large answer
817 content = ""
818 for i in range(44):
819 if len(content) > 0:
820 content = content + ', '
821 content = content + (str(i)*50)
822 # pad up to 4096
823 content = content + 'A'*40
824
825 rrset = dns.rrset.from_text(name,
826 3600,
827 dns.rdataclass.IN,
828 dns.rdatatype.TXT,
829 content)
830 response.answer.append(rrset)
831 self.assertEquals(len(response.to_wire()), 4096)
832
833 # first query to fill the cache
834 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
835 self.assertTrue(receivedQuery)
836 self.assertTrue(receivedResponse)
837 receivedQuery.id = expectedQuery.id
838 self.assertEquals(expectedQuery, receivedQuery)
839 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
840 self.assertEquals(response, receivedResponse)
841
842 for _ in range(numberOfQueries):
843 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False)
844 self.assertEquals(receivedResponse, response)