]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #8235 from Habbie/dyn-dup-ptr
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8 from io import BytesIO
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
36 response_headers = BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn.setopt(pycurl.URL, url)
39 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
40 if useHTTPS:
41 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
42 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
43 if caFile:
44 conn.setopt(pycurl.CAINFO, caFile)
45
46 conn.setopt(pycurl.HTTPHEADER, customHeaders)
47 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
48
49 if response:
50 cls._toResponderQueue.put(response, True, timeout)
51
52 receivedQuery = None
53 message = None
54 cls._response_headers = ''
55 data = conn.perform_rb()
56 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
57 if cls._rcode == 200 and not rawResponse:
58 message = dns.message.from_wire(data)
59 elif rawResponse:
60 message = data
61
62 if useQueue and not cls._fromResponderQueue.empty():
63 receivedQuery = cls._fromResponderQueue.get(True, timeout)
64
65 cls._response_headers = response_headers.getvalue()
66 return (receivedQuery, message)
67
68 @classmethod
69 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
70 url = baseurl
71 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
72 response_headers = BytesIO()
73 #conn.setopt(pycurl.VERBOSE, True)
74 conn.setopt(pycurl.URL, url)
75 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
76 if useHTTPS:
77 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
78 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
79 if caFile:
80 conn.setopt(pycurl.CAINFO, caFile)
81
82 conn.setopt(pycurl.HTTPHEADER, customHeaders)
83 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
84 conn.setopt(pycurl.POST, True)
85 data = query
86 if not rawQuery:
87 data = data.to_wire()
88
89 conn.setopt(pycurl.POSTFIELDS, data)
90
91 if response:
92 cls._toResponderQueue.put(response, True, timeout)
93
94 receivedQuery = None
95 message = None
96 cls._response_headers = ''
97 data = conn.perform_rb()
98 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
99 if cls._rcode == 200 and not rawResponse:
100 message = dns.message.from_wire(data)
101 elif rawResponse:
102 message = data
103
104 if useQueue and not cls._fromResponderQueue.empty():
105 receivedQuery = cls._fromResponderQueue.get(True, timeout)
106
107 cls._response_headers = response_headers.getvalue()
108 return (receivedQuery, message)
109
110 # @classmethod
111 # def openDOHConnection(cls, port, caFile, timeout=2.0):
112 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
113 # sslctx.load_verify_locations(caFile)
114 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
115
116 # @classmethod
117 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
118 # url = cls.getDOHGetURL(baseurl, query)
119
120 # if response:
121 # cls._toResponderQueue.put(response, True, timeout)
122
123 # conn.request('GET', url)
124
125 # @classmethod
126 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
127 # message = None
128 # data = conn.get_response()
129 # if data:
130 # data = data.read()
131 # if data:
132 # message = dns.message.from_wire(data)
133
134 # if useQueue and not cls._fromResponderQueue.empty():
135 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
136 # return (receivedQuery, message)
137 # else:
138 # return message
139
140 class TestDOH(DNSDistDOHTest):
141
142 _serverKey = 'server.key'
143 _serverCert = 'server.chain'
144 _serverName = 'tls.tests.dnsdist.org'
145 _caCert = 'ca.pem'
146 _dohServerPort = 8443
147 _customResponseHeader1 = 'access-control-allow-origin: *'
148 _customResponseHeader2 = 'user-agent: derp'
149 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
150 _config_template = """
151 newServer{address="127.0.0.1:%s"}
152
153 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
154 dohFE = getDOHFrontend(0)
155 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
156
157 addAction("drop.doh.tests.powerdns.com.", DropAction())
158 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
159 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
160 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
161 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
162 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
163 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
164 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
165
166 function dohHandler(dq)
167 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
168 local foundct = false
169 for key,value in pairs(dq:getHTTPHeaders()) do
170 if key == 'content-type' and value == 'application/dns-message' then
171 foundct = true
172 break
173 end
174 end
175 if foundct then
176 dq:setHTTPResponse(200, 'It works!', 'text/plain')
177 dq.dh:setQR(true)
178 return DNSAction.HeaderModify
179 end
180 end
181 return DNSAction.None
182 end
183 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
184 """
185 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
186
187 def testDOHSimple(self):
188 """
189 DOH: Simple query
190 """
191 name = 'simple.doh.tests.powerdns.com.'
192 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
193 query.id = 0
194 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
195 expectedQuery.id = 0
196 response = dns.message.make_response(query)
197 rrset = dns.rrset.from_text(name,
198 3600,
199 dns.rdataclass.IN,
200 dns.rdatatype.A,
201 '127.0.0.1')
202 response.answer.append(rrset)
203
204 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
205 self.assertTrue(receivedQuery)
206 self.assertTrue(receivedResponse)
207 receivedQuery.id = expectedQuery.id
208 self.assertEquals(expectedQuery, receivedQuery)
209 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
210 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
211 self.assertFalse(('UPPERCASE: VaLuE' in self._response_headers.decode()))
212 self.assertTrue(('uppercase: VaLuE' in self._response_headers.decode()))
213 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
214 self.assertEquals(response, receivedResponse)
215
216 def testDOHSimplePOST(self):
217 """
218 DOH: Simple POST query
219 """
220 name = 'simple-post.doh.tests.powerdns.com.'
221 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
222 query.id = 0
223 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
224 expectedQuery.id = 0
225 response = dns.message.make_response(query)
226 rrset = dns.rrset.from_text(name,
227 3600,
228 dns.rdataclass.IN,
229 dns.rdatatype.A,
230 '127.0.0.1')
231 response.answer.append(rrset)
232
233 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
234 self.assertTrue(receivedQuery)
235 self.assertTrue(receivedResponse)
236 receivedQuery.id = expectedQuery.id
237 self.assertEquals(expectedQuery, receivedQuery)
238 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
239 self.assertEquals(response, receivedResponse)
240
241 def testDOHExistingEDNS(self):
242 """
243 DOH: Existing EDNS
244 """
245 name = 'existing-edns.doh.tests.powerdns.com.'
246 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
247 query.id = 0
248 response = dns.message.make_response(query)
249 rrset = dns.rrset.from_text(name,
250 3600,
251 dns.rdataclass.IN,
252 dns.rdatatype.A,
253 '127.0.0.1')
254 response.answer.append(rrset)
255
256 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
257 self.assertTrue(receivedQuery)
258 self.assertTrue(receivedResponse)
259 receivedQuery.id = query.id
260 self.assertEquals(query, receivedQuery)
261 self.assertEquals(response, receivedResponse)
262 self.checkQueryEDNSWithoutECS(query, receivedQuery)
263 self.checkResponseEDNSWithoutECS(response, receivedResponse)
264
265 def testDOHExistingECS(self):
266 """
267 DOH: Existing EDNS Client Subnet
268 """
269 name = 'existing-ecs.doh.tests.powerdns.com.'
270 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
271 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
272 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
273 query.id = 0
274 response = dns.message.make_response(query)
275 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
276 rrset = dns.rrset.from_text(name,
277 3600,
278 dns.rdataclass.IN,
279 dns.rdatatype.A,
280 '127.0.0.1')
281 response.answer.append(rrset)
282
283 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
284 self.assertTrue(receivedQuery)
285 self.assertTrue(receivedResponse)
286 receivedQuery.id = query.id
287 self.assertEquals(query, receivedQuery)
288 self.assertEquals(response, receivedResponse)
289 self.checkQueryEDNSWithECS(query, receivedQuery)
290 self.checkResponseEDNSWithECS(response, receivedResponse)
291
292 def testDropped(self):
293 """
294 DOH: Dropped query
295 """
296 name = 'drop.doh.tests.powerdns.com.'
297 query = dns.message.make_query(name, 'A', 'IN')
298 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
299 self.assertEquals(receivedResponse, None)
300
301 def testRefused(self):
302 """
303 DOH: Refused
304 """
305 name = 'refused.doh.tests.powerdns.com.'
306 query = dns.message.make_query(name, 'A', 'IN')
307 query.id = 0
308 expectedResponse = dns.message.make_response(query)
309 expectedResponse.set_rcode(dns.rcode.REFUSED)
310
311 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
312 self.assertEquals(receivedResponse, expectedResponse)
313
314 def testSpoof(self):
315 """
316 DOH: Spoofed
317 """
318 name = 'spoof.doh.tests.powerdns.com.'
319 query = dns.message.make_query(name, 'A', 'IN')
320 query.id = 0
321 query.flags &= ~dns.flags.RD
322 expectedResponse = dns.message.make_response(query)
323 rrset = dns.rrset.from_text(name,
324 3600,
325 dns.rdataclass.IN,
326 dns.rdatatype.A,
327 '1.2.3.4')
328 expectedResponse.answer.append(rrset)
329
330 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
331 self.assertEquals(receivedResponse, expectedResponse)
332
333 def testDOHInvalid(self):
334 """
335 DOH: Invalid query
336 """
337 name = 'invalid.doh.tests.powerdns.com.'
338 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
339 invalidQuery.id = 0
340 # first an invalid query
341 invalidQuery = invalidQuery.to_wire()
342 invalidQuery = invalidQuery[:-5]
343 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
344 self.assertEquals(receivedResponse, None)
345
346 # and now a valid one
347 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
348 query.id = 0
349 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
350 expectedQuery.id = 0
351 response = dns.message.make_response(query)
352 rrset = dns.rrset.from_text(name,
353 3600,
354 dns.rdataclass.IN,
355 dns.rdatatype.A,
356 '127.0.0.1')
357 response.answer.append(rrset)
358 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
359 self.assertTrue(receivedQuery)
360 self.assertTrue(receivedResponse)
361 receivedQuery.id = expectedQuery.id
362 self.assertEquals(expectedQuery, receivedQuery)
363 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
364 self.assertEquals(response, receivedResponse)
365
366 def testDOHWithoutQuery(self):
367 """
368 DOH: Empty GET query
369 """
370 name = 'empty-get.doh.tests.powerdns.com.'
371 url = self._dohBaseURL
372 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
373 conn.setopt(pycurl.URL, url)
374 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
375 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
376 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
377 conn.setopt(pycurl.CAINFO, self._caCert)
378 data = conn.perform_rb()
379 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
380 self.assertEquals(rcode, 400)
381
382 def testDOHEmptyPOST(self):
383 """
384 DOH: Empty POST query
385 """
386 name = 'empty-post.doh.tests.powerdns.com.'
387
388 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
389 self.assertEquals(receivedResponse, None)
390
391 # and now a valid one
392 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
393 query.id = 0
394 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
395 expectedQuery.id = 0
396 response = dns.message.make_response(query)
397 rrset = dns.rrset.from_text(name,
398 3600,
399 dns.rdataclass.IN,
400 dns.rdatatype.A,
401 '127.0.0.1')
402 response.answer.append(rrset)
403 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
404 self.assertTrue(receivedQuery)
405 self.assertTrue(receivedResponse)
406 receivedQuery.id = expectedQuery.id
407 self.assertEquals(expectedQuery, receivedQuery)
408 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
409 self.assertEquals(response, receivedResponse)
410
411 def testHeaderRule(self):
412 """
413 DOH: HeaderRule
414 """
415 name = 'header-rule.doh.tests.powerdns.com.'
416 query = dns.message.make_query(name, 'A', 'IN')
417 query.id = 0
418 query.flags &= ~dns.flags.RD
419 expectedResponse = dns.message.make_response(query)
420 rrset = dns.rrset.from_text(name,
421 3600,
422 dns.rdataclass.IN,
423 dns.rdatatype.A,
424 '2.3.4.5')
425 expectedResponse.answer.append(rrset)
426
427 # this header should match
428 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
429 self.assertEquals(receivedResponse, expectedResponse)
430
431 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
432 expectedQuery.flags &= ~dns.flags.RD
433 expectedQuery.id = 0
434 response = dns.message.make_response(query)
435 rrset = dns.rrset.from_text(name,
436 3600,
437 dns.rdataclass.IN,
438 dns.rdatatype.A,
439 '127.0.0.1')
440 response.answer.append(rrset)
441
442 # this content of the header should NOT match
443 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
444 self.assertTrue(receivedQuery)
445 self.assertTrue(receivedResponse)
446 receivedQuery.id = expectedQuery.id
447 self.assertEquals(expectedQuery, receivedQuery)
448 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
449 self.assertEquals(response, receivedResponse)
450
451 def testHTTPPath(self):
452 """
453 DOH: HTTPPath
454 """
455 name = 'http-path.doh.tests.powerdns.com.'
456 query = dns.message.make_query(name, 'A', 'IN')
457 query.id = 0
458 query.flags &= ~dns.flags.RD
459 expectedResponse = dns.message.make_response(query)
460 rrset = dns.rrset.from_text(name,
461 3600,
462 dns.rdataclass.IN,
463 dns.rdatatype.A,
464 '3.4.5.6')
465 expectedResponse.answer.append(rrset)
466
467 # this path should match
468 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
469 self.assertEquals(receivedResponse, expectedResponse)
470
471 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
472 expectedQuery.id = 0
473 expectedQuery.flags &= ~dns.flags.RD
474 response = dns.message.make_response(query)
475 rrset = dns.rrset.from_text(name,
476 3600,
477 dns.rdataclass.IN,
478 dns.rdatatype.A,
479 '127.0.0.1')
480 response.answer.append(rrset)
481
482 # this path should NOT match
483 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
484 self.assertTrue(receivedQuery)
485 self.assertTrue(receivedResponse)
486 receivedQuery.id = expectedQuery.id
487 self.assertEquals(expectedQuery, receivedQuery)
488 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
489 self.assertEquals(response, receivedResponse)
490
491 def testHTTPPathRegex(self):
492 """
493 DOH: HTTPPathRegex
494 """
495 name = 'http-path-regex.doh.tests.powerdns.com.'
496 query = dns.message.make_query(name, 'A', 'IN')
497 query.id = 0
498 query.flags &= ~dns.flags.RD
499 expectedResponse = dns.message.make_response(query)
500 rrset = dns.rrset.from_text(name,
501 3600,
502 dns.rdataclass.IN,
503 dns.rdatatype.A,
504 '6.7.8.9')
505 expectedResponse.answer.append(rrset)
506
507 # this path should match
508 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS-999', caFile=self._caCert, query=query, response=None, useQueue=False)
509 self.assertEquals(receivedResponse, expectedResponse)
510
511 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
512 expectedQuery.id = 0
513 expectedQuery.flags &= ~dns.flags.RD
514 response = dns.message.make_response(query)
515 rrset = dns.rrset.from_text(name,
516 3600,
517 dns.rdataclass.IN,
518 dns.rdatatype.A,
519 '127.0.0.1')
520 response.answer.append(rrset)
521
522 # this path should NOT match
523 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
524 self.assertTrue(receivedQuery)
525 self.assertTrue(receivedResponse)
526 receivedQuery.id = expectedQuery.id
527 self.assertEquals(expectedQuery, receivedQuery)
528 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
529 self.assertEquals(response, receivedResponse)
530
531 def testHTTPStatusAction200(self):
532 """
533 DOH: HTTPStatusAction 200 OK
534 """
535 name = 'http-status-action.doh.tests.powerdns.com.'
536 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
537 query.id = 0
538
539 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
540 self.assertTrue(receivedResponse)
541 self.assertEquals(receivedResponse, b'Plaintext answer')
542 self.assertEquals(self._rcode, 200)
543 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
544
545 def testHTTPStatusAction307(self):
546 """
547 DOH: HTTPStatusAction 307
548 """
549 name = 'http-status-action-redirect.doh.tests.powerdns.com.'
550 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
551 query.id = 0
552
553 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
554 self.assertTrue(receivedResponse)
555 self.assertEquals(self._rcode, 307)
556 self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
557
558 def testHTTPLuaResponse(self):
559 """
560 DOH: Lua HTTP Response
561 """
562 name = 'http-lua.doh.tests.powerdns.com.'
563 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
564 query.id = 0
565
566 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
567 self.assertTrue(receivedResponse)
568 self.assertEquals(receivedResponse, b'It works!')
569 self.assertEquals(self._rcode, 200)
570 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
571
572 def testHTTPEarlyResponse(self):
573 """
574 DOH: HTTP Early Response
575 """
576 response_headers = BytesIO()
577 url = self._dohBaseURL + 'coffee'
578 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
579 conn.setopt(pycurl.URL, url)
580 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
581 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
582 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
583 conn.setopt(pycurl.CAINFO, self._caCert)
584 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
585 data = conn.perform_rb()
586 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
587 headers = response_headers.getvalue().decode()
588
589 self.assertEquals(rcode, 418)
590 self.assertEquals(data, b'C0FFEE')
591 self.assertIn('foo: bar', headers)
592 self.assertNotIn(self._customResponseHeader2, headers)
593
594 response_headers = BytesIO()
595 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
596 conn.setopt(pycurl.URL, url)
597 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
598 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
599 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
600 conn.setopt(pycurl.CAINFO, self._caCert)
601 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
602 conn.setopt(pycurl.POST, True)
603 data = ''
604 conn.setopt(pycurl.POSTFIELDS, data)
605
606 data = conn.perform_rb()
607 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
608 headers = response_headers.getvalue().decode()
609 self.assertEquals(rcode, 418)
610 self.assertEquals(data, b'C0FFEE')
611 self.assertIn('foo: bar', headers)
612 self.assertNotIn(self._customResponseHeader2, headers)
613
614 class TestDOHAddingECS(DNSDistDOHTest):
615
616 _serverKey = 'server.key'
617 _serverCert = 'server.chain'
618 _serverName = 'tls.tests.dnsdist.org'
619 _caCert = 'ca.pem'
620 _dohServerPort = 8443
621 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
622 _config_template = """
623 newServer{address="127.0.0.1:%s", useClientSubnet=true}
624 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
625 setECSOverride(true)
626 """
627 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
628
629 def testDOHSimple(self):
630 """
631 DOH with ECS: Simple query
632 """
633 name = 'simple.doh-ecs.tests.powerdns.com.'
634 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
635 query.id = 0
636 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
637 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
638 response = dns.message.make_response(query)
639 rrset = dns.rrset.from_text(name,
640 3600,
641 dns.rdataclass.IN,
642 dns.rdatatype.A,
643 '127.0.0.1')
644 response.answer.append(rrset)
645
646 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
647 self.assertTrue(receivedQuery)
648 self.assertTrue(receivedResponse)
649 expectedQuery.id = receivedQuery.id
650 self.assertEquals(expectedQuery, receivedQuery)
651 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
652 self.assertEquals(response, receivedResponse)
653 self.checkResponseNoEDNS(response, receivedResponse)
654
655 def testDOHExistingEDNS(self):
656 """
657 DOH with ECS: Existing EDNS
658 """
659 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
660 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
661 query.id = 0
662 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
663 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
664 response = dns.message.make_response(query)
665 rrset = dns.rrset.from_text(name,
666 3600,
667 dns.rdataclass.IN,
668 dns.rdatatype.A,
669 '127.0.0.1')
670 response.answer.append(rrset)
671
672 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
673 self.assertTrue(receivedQuery)
674 self.assertTrue(receivedResponse)
675 receivedQuery.id = expectedQuery.id
676 self.assertEquals(expectedQuery, receivedQuery)
677 self.assertEquals(response, receivedResponse)
678 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
679 self.checkResponseEDNSWithoutECS(response, receivedResponse)
680
681 def testDOHExistingECS(self):
682 """
683 DOH with ECS: Existing EDNS Client Subnet
684 """
685 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
686 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
687 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
688 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
689 query.id = 0
690 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
691 response = dns.message.make_response(query)
692 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
693 rrset = dns.rrset.from_text(name,
694 3600,
695 dns.rdataclass.IN,
696 dns.rdatatype.A,
697 '127.0.0.1')
698 response.answer.append(rrset)
699
700 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
701 self.assertTrue(receivedQuery)
702 self.assertTrue(receivedResponse)
703 receivedQuery.id = expectedQuery.id
704 self.assertEquals(expectedQuery, receivedQuery)
705 self.assertEquals(response, receivedResponse)
706 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
707 self.checkResponseEDNSWithECS(response, receivedResponse)
708
709 class TestDOHOverHTTP(DNSDistDOHTest):
710
711 _dohServerPort = 8480
712 _serverName = 'tls.tests.dnsdist.org'
713 _dohBaseURL = ("http://%s:%d/" % (_serverName, _dohServerPort))
714 _config_template = """
715 newServer{address="127.0.0.1:%s"}
716 addDOHLocal("127.0.0.1:%s")
717 """
718 _config_params = ['_testServerPort', '_dohServerPort']
719
720 def testDOHSimple(self):
721 """
722 DOH over HTTP: Simple query
723 """
724 name = 'simple.doh-over-http.tests.powerdns.com.'
725 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
726 query.id = 0
727 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
728 response = dns.message.make_response(query)
729 rrset = dns.rrset.from_text(name,
730 3600,
731 dns.rdataclass.IN,
732 dns.rdatatype.A,
733 '127.0.0.1')
734 response.answer.append(rrset)
735
736 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
737 self.assertTrue(receivedQuery)
738 self.assertTrue(receivedResponse)
739 expectedQuery.id = receivedQuery.id
740 self.assertEquals(expectedQuery, receivedQuery)
741 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
742 self.assertEquals(response, receivedResponse)
743 self.checkResponseNoEDNS(response, receivedResponse)
744
745 def testDOHSimplePOST(self):
746 """
747 DOH over HTTP: Simple POST query
748 """
749 name = 'simple-post.doh-over-http.tests.powerdns.com.'
750 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
751 query.id = 0
752 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
753 expectedQuery.id = 0
754 response = dns.message.make_response(query)
755 rrset = dns.rrset.from_text(name,
756 3600,
757 dns.rdataclass.IN,
758 dns.rdatatype.A,
759 '127.0.0.1')
760 response.answer.append(rrset)
761
762 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
763 self.assertTrue(receivedQuery)
764 self.assertTrue(receivedResponse)
765 receivedQuery.id = expectedQuery.id
766 self.assertEquals(expectedQuery, receivedQuery)
767 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
768 self.assertEquals(response, receivedResponse)
769 self.checkResponseNoEDNS(response, receivedResponse)
770
771 class TestDOHWithCache(DNSDistDOHTest):
772
773 _serverKey = 'server.key'
774 _serverCert = 'server.chain'
775 _serverName = 'tls.tests.dnsdist.org'
776 _caCert = 'ca.pem'
777 _dohServerPort = 8443
778 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
779 _config_template = """
780 newServer{address="127.0.0.1:%s"}
781
782 addDOHLocal("127.0.0.1:%s", "%s", "%s")
783
784 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
785 getPool(""):setCache(pc)
786 """
787 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
788
789 def testDOHCacheLargeAnswer(self):
790 """
791 DOH with cache: Check that we can cache (and retrieve) large answers
792 """
793 numberOfQueries = 10
794 name = 'large.doh-with-cache.tests.powerdns.com.'
795 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
796 query.id = 0
797 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
798 expectedQuery.id = 0
799 response = dns.message.make_response(query)
800 # we prepare a large answer
801 content = ""
802 for i in range(44):
803 if len(content) > 0:
804 content = content + ', '
805 content = content + (str(i)*50)
806 # pad up to 4096
807 content = content + 'A'*40
808
809 rrset = dns.rrset.from_text(name,
810 3600,
811 dns.rdataclass.IN,
812 dns.rdatatype.TXT,
813 content)
814 response.answer.append(rrset)
815 self.assertEquals(len(response.to_wire()), 4096)
816
817 # first query to fill the cache
818 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
819 self.assertTrue(receivedQuery)
820 self.assertTrue(receivedResponse)
821 receivedQuery.id = expectedQuery.id
822 self.assertEquals(expectedQuery, receivedQuery)
823 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
824 self.assertEquals(response, receivedResponse)
825
826 for _ in range(numberOfQueries):
827 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False)
828 self.assertEquals(receivedResponse, response)