]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #8303 from rgacogne/dnsdist-fix-invalid-secpoll-answer-msg
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8 from io import BytesIO
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
36 response_headers = BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn.setopt(pycurl.URL, url)
39 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
40 if useHTTPS:
41 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
42 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
43 if caFile:
44 conn.setopt(pycurl.CAINFO, caFile)
45
46 conn.setopt(pycurl.HTTPHEADER, customHeaders)
47 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
48
49 if response:
50 cls._toResponderQueue.put(response, True, timeout)
51
52 receivedQuery = None
53 message = None
54 cls._response_headers = ''
55 data = conn.perform_rb()
56 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
57 if cls._rcode == 200 and not rawResponse:
58 message = dns.message.from_wire(data)
59 elif rawResponse:
60 message = data
61
62 if useQueue and not cls._fromResponderQueue.empty():
63 receivedQuery = cls._fromResponderQueue.get(True, timeout)
64
65 cls._response_headers = response_headers.getvalue()
66 return (receivedQuery, message)
67
68 @classmethod
69 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
70 url = baseurl
71 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
72 response_headers = BytesIO()
73 #conn.setopt(pycurl.VERBOSE, True)
74 conn.setopt(pycurl.URL, url)
75 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
76 if useHTTPS:
77 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
78 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
79 if caFile:
80 conn.setopt(pycurl.CAINFO, caFile)
81
82 conn.setopt(pycurl.HTTPHEADER, customHeaders)
83 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
84 conn.setopt(pycurl.POST, True)
85 data = query
86 if not rawQuery:
87 data = data.to_wire()
88
89 conn.setopt(pycurl.POSTFIELDS, data)
90
91 if response:
92 cls._toResponderQueue.put(response, True, timeout)
93
94 receivedQuery = None
95 message = None
96 cls._response_headers = ''
97 data = conn.perform_rb()
98 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
99 if cls._rcode == 200 and not rawResponse:
100 message = dns.message.from_wire(data)
101 elif rawResponse:
102 message = data
103
104 if useQueue and not cls._fromResponderQueue.empty():
105 receivedQuery = cls._fromResponderQueue.get(True, timeout)
106
107 cls._response_headers = response_headers.getvalue()
108 return (receivedQuery, message)
109
110 # @classmethod
111 # def openDOHConnection(cls, port, caFile, timeout=2.0):
112 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
113 # sslctx.load_verify_locations(caFile)
114 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
115
116 # @classmethod
117 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
118 # url = cls.getDOHGetURL(baseurl, query)
119
120 # if response:
121 # cls._toResponderQueue.put(response, True, timeout)
122
123 # conn.request('GET', url)
124
125 # @classmethod
126 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
127 # message = None
128 # data = conn.get_response()
129 # if data:
130 # data = data.read()
131 # if data:
132 # message = dns.message.from_wire(data)
133
134 # if useQueue and not cls._fromResponderQueue.empty():
135 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
136 # return (receivedQuery, message)
137 # else:
138 # return message
139
140 class TestDOH(DNSDistDOHTest):
141
142 _serverKey = 'server.key'
143 _serverCert = 'server.chain'
144 _serverName = 'tls.tests.dnsdist.org'
145 _caCert = 'ca.pem'
146 _dohServerPort = 8443
147 _customResponseHeader1 = 'access-control-allow-origin: *'
148 _customResponseHeader2 = 'user-agent: derp'
149 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
150 _config_template = """
151 newServer{address="127.0.0.1:%s"}
152
153 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
154 dohFE = getDOHFrontend(0)
155 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['foo']='bar'})})
156
157 addAction("drop.doh.tests.powerdns.com.", DropAction())
158 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
159 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
160 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
161 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
162 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
163 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
164 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
165
166 function dohHandler(dq)
167 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
168 local foundct = false
169 for key,value in pairs(dq:getHTTPHeaders()) do
170 if key == 'content-type' and value == 'application/dns-message' then
171 foundct = true
172 break
173 end
174 end
175 if foundct then
176 dq:setHTTPResponse(200, 'It works!', 'text/plain')
177 dq.dh:setQR(true)
178 return DNSAction.HeaderModify
179 end
180 end
181 return DNSAction.None
182 end
183 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
184 """
185 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
186
187 def testDOHSimple(self):
188 """
189 DOH: Simple query
190 """
191 name = 'simple.doh.tests.powerdns.com.'
192 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
193 query.id = 0
194 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
195 expectedQuery.id = 0
196 response = dns.message.make_response(query)
197 rrset = dns.rrset.from_text(name,
198 3600,
199 dns.rdataclass.IN,
200 dns.rdatatype.A,
201 '127.0.0.1')
202 response.answer.append(rrset)
203
204 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
205 self.assertTrue(receivedQuery)
206 self.assertTrue(receivedResponse)
207 receivedQuery.id = expectedQuery.id
208 self.assertEquals(expectedQuery, receivedQuery)
209 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
210 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
211 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
212 self.assertEquals(response, receivedResponse)
213
214 def testDOHSimplePOST(self):
215 """
216 DOH: Simple POST query
217 """
218 name = 'simple-post.doh.tests.powerdns.com.'
219 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
220 query.id = 0
221 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
222 expectedQuery.id = 0
223 response = dns.message.make_response(query)
224 rrset = dns.rrset.from_text(name,
225 3600,
226 dns.rdataclass.IN,
227 dns.rdatatype.A,
228 '127.0.0.1')
229 response.answer.append(rrset)
230
231 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
232 self.assertTrue(receivedQuery)
233 self.assertTrue(receivedResponse)
234 receivedQuery.id = expectedQuery.id
235 self.assertEquals(expectedQuery, receivedQuery)
236 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
237 self.assertEquals(response, receivedResponse)
238
239 def testDOHExistingEDNS(self):
240 """
241 DOH: Existing EDNS
242 """
243 name = 'existing-edns.doh.tests.powerdns.com.'
244 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
245 query.id = 0
246 response = dns.message.make_response(query)
247 rrset = dns.rrset.from_text(name,
248 3600,
249 dns.rdataclass.IN,
250 dns.rdatatype.A,
251 '127.0.0.1')
252 response.answer.append(rrset)
253
254 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
255 self.assertTrue(receivedQuery)
256 self.assertTrue(receivedResponse)
257 receivedQuery.id = query.id
258 self.assertEquals(query, receivedQuery)
259 self.assertEquals(response, receivedResponse)
260 self.checkQueryEDNSWithoutECS(query, receivedQuery)
261 self.checkResponseEDNSWithoutECS(response, receivedResponse)
262
263 def testDOHExistingECS(self):
264 """
265 DOH: Existing EDNS Client Subnet
266 """
267 name = 'existing-ecs.doh.tests.powerdns.com.'
268 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
269 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
270 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
271 query.id = 0
272 response = dns.message.make_response(query)
273 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
274 rrset = dns.rrset.from_text(name,
275 3600,
276 dns.rdataclass.IN,
277 dns.rdatatype.A,
278 '127.0.0.1')
279 response.answer.append(rrset)
280
281 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
282 self.assertTrue(receivedQuery)
283 self.assertTrue(receivedResponse)
284 receivedQuery.id = query.id
285 self.assertEquals(query, receivedQuery)
286 self.assertEquals(response, receivedResponse)
287 self.checkQueryEDNSWithECS(query, receivedQuery)
288 self.checkResponseEDNSWithECS(response, receivedResponse)
289
290 def testDropped(self):
291 """
292 DOH: Dropped query
293 """
294 name = 'drop.doh.tests.powerdns.com.'
295 query = dns.message.make_query(name, 'A', 'IN')
296 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
297 self.assertEquals(receivedResponse, None)
298
299 def testRefused(self):
300 """
301 DOH: Refused
302 """
303 name = 'refused.doh.tests.powerdns.com.'
304 query = dns.message.make_query(name, 'A', 'IN')
305 query.id = 0
306 expectedResponse = dns.message.make_response(query)
307 expectedResponse.set_rcode(dns.rcode.REFUSED)
308
309 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
310 self.assertEquals(receivedResponse, expectedResponse)
311
312 def testSpoof(self):
313 """
314 DOH: Spoofed
315 """
316 name = 'spoof.doh.tests.powerdns.com.'
317 query = dns.message.make_query(name, 'A', 'IN')
318 query.id = 0
319 query.flags &= ~dns.flags.RD
320 expectedResponse = dns.message.make_response(query)
321 rrset = dns.rrset.from_text(name,
322 3600,
323 dns.rdataclass.IN,
324 dns.rdatatype.A,
325 '1.2.3.4')
326 expectedResponse.answer.append(rrset)
327
328 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
329 self.assertEquals(receivedResponse, expectedResponse)
330
331 def testDOHInvalid(self):
332 """
333 DOH: Invalid query
334 """
335 name = 'invalid.doh.tests.powerdns.com.'
336 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
337 invalidQuery.id = 0
338 # first an invalid query
339 invalidQuery = invalidQuery.to_wire()
340 invalidQuery = invalidQuery[:-5]
341 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
342 self.assertEquals(receivedResponse, None)
343
344 # and now a valid one
345 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
346 query.id = 0
347 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
348 expectedQuery.id = 0
349 response = dns.message.make_response(query)
350 rrset = dns.rrset.from_text(name,
351 3600,
352 dns.rdataclass.IN,
353 dns.rdatatype.A,
354 '127.0.0.1')
355 response.answer.append(rrset)
356 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
357 self.assertTrue(receivedQuery)
358 self.assertTrue(receivedResponse)
359 receivedQuery.id = expectedQuery.id
360 self.assertEquals(expectedQuery, receivedQuery)
361 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
362 self.assertEquals(response, receivedResponse)
363
364 def testDOHWithoutQuery(self):
365 """
366 DOH: Empty GET query
367 """
368 name = 'empty-get.doh.tests.powerdns.com.'
369 url = self._dohBaseURL
370 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
371 conn.setopt(pycurl.URL, url)
372 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
373 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
374 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
375 conn.setopt(pycurl.CAINFO, self._caCert)
376 data = conn.perform_rb()
377 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
378 self.assertEquals(rcode, 400)
379
380 def testDOHEmptyPOST(self):
381 """
382 DOH: Empty POST query
383 """
384 name = 'empty-post.doh.tests.powerdns.com.'
385
386 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
387 self.assertEquals(receivedResponse, None)
388
389 # and now a valid one
390 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
391 query.id = 0
392 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
393 expectedQuery.id = 0
394 response = dns.message.make_response(query)
395 rrset = dns.rrset.from_text(name,
396 3600,
397 dns.rdataclass.IN,
398 dns.rdatatype.A,
399 '127.0.0.1')
400 response.answer.append(rrset)
401 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
402 self.assertTrue(receivedQuery)
403 self.assertTrue(receivedResponse)
404 receivedQuery.id = expectedQuery.id
405 self.assertEquals(expectedQuery, receivedQuery)
406 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
407 self.assertEquals(response, receivedResponse)
408
409 def testHeaderRule(self):
410 """
411 DOH: HeaderRule
412 """
413 name = 'header-rule.doh.tests.powerdns.com.'
414 query = dns.message.make_query(name, 'A', 'IN')
415 query.id = 0
416 query.flags &= ~dns.flags.RD
417 expectedResponse = dns.message.make_response(query)
418 rrset = dns.rrset.from_text(name,
419 3600,
420 dns.rdataclass.IN,
421 dns.rdatatype.A,
422 '2.3.4.5')
423 expectedResponse.answer.append(rrset)
424
425 # this header should match
426 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
427 self.assertEquals(receivedResponse, expectedResponse)
428
429 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
430 expectedQuery.flags &= ~dns.flags.RD
431 expectedQuery.id = 0
432 response = dns.message.make_response(query)
433 rrset = dns.rrset.from_text(name,
434 3600,
435 dns.rdataclass.IN,
436 dns.rdatatype.A,
437 '127.0.0.1')
438 response.answer.append(rrset)
439
440 # this content of the header should NOT match
441 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
442 self.assertTrue(receivedQuery)
443 self.assertTrue(receivedResponse)
444 receivedQuery.id = expectedQuery.id
445 self.assertEquals(expectedQuery, receivedQuery)
446 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
447 self.assertEquals(response, receivedResponse)
448
449 def testHTTPPath(self):
450 """
451 DOH: HTTPPath
452 """
453 name = 'http-path.doh.tests.powerdns.com.'
454 query = dns.message.make_query(name, 'A', 'IN')
455 query.id = 0
456 query.flags &= ~dns.flags.RD
457 expectedResponse = dns.message.make_response(query)
458 rrset = dns.rrset.from_text(name,
459 3600,
460 dns.rdataclass.IN,
461 dns.rdatatype.A,
462 '3.4.5.6')
463 expectedResponse.answer.append(rrset)
464
465 # this path should match
466 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
467 self.assertEquals(receivedResponse, expectedResponse)
468
469 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
470 expectedQuery.id = 0
471 expectedQuery.flags &= ~dns.flags.RD
472 response = dns.message.make_response(query)
473 rrset = dns.rrset.from_text(name,
474 3600,
475 dns.rdataclass.IN,
476 dns.rdatatype.A,
477 '127.0.0.1')
478 response.answer.append(rrset)
479
480 # this path should NOT match
481 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
482 self.assertTrue(receivedQuery)
483 self.assertTrue(receivedResponse)
484 receivedQuery.id = expectedQuery.id
485 self.assertEquals(expectedQuery, receivedQuery)
486 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
487 self.assertEquals(response, receivedResponse)
488
489 def testHTTPPathRegex(self):
490 """
491 DOH: HTTPPathRegex
492 """
493 name = 'http-path-regex.doh.tests.powerdns.com.'
494 query = dns.message.make_query(name, 'A', 'IN')
495 query.id = 0
496 query.flags &= ~dns.flags.RD
497 expectedResponse = dns.message.make_response(query)
498 rrset = dns.rrset.from_text(name,
499 3600,
500 dns.rdataclass.IN,
501 dns.rdatatype.A,
502 '6.7.8.9')
503 expectedResponse.answer.append(rrset)
504
505 # this path should match
506 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS-999', caFile=self._caCert, query=query, response=None, useQueue=False)
507 self.assertEquals(receivedResponse, expectedResponse)
508
509 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
510 expectedQuery.id = 0
511 expectedQuery.flags &= ~dns.flags.RD
512 response = dns.message.make_response(query)
513 rrset = dns.rrset.from_text(name,
514 3600,
515 dns.rdataclass.IN,
516 dns.rdatatype.A,
517 '127.0.0.1')
518 response.answer.append(rrset)
519
520 # this path should NOT match
521 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
522 self.assertTrue(receivedQuery)
523 self.assertTrue(receivedResponse)
524 receivedQuery.id = expectedQuery.id
525 self.assertEquals(expectedQuery, receivedQuery)
526 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
527 self.assertEquals(response, receivedResponse)
528
529 def testHTTPStatusAction200(self):
530 """
531 DOH: HTTPStatusAction 200 OK
532 """
533 name = 'http-status-action.doh.tests.powerdns.com.'
534 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
535 query.id = 0
536
537 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
538 self.assertTrue(receivedResponse)
539 self.assertEquals(receivedResponse, b'Plaintext answer')
540 self.assertEquals(self._rcode, 200)
541 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
542
543 def testHTTPStatusAction307(self):
544 """
545 DOH: HTTPStatusAction 307
546 """
547 name = 'http-status-action-redirect.doh.tests.powerdns.com.'
548 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
549 query.id = 0
550
551 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
552 self.assertTrue(receivedResponse)
553 self.assertEquals(self._rcode, 307)
554 self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
555
556 def testHTTPLuaResponse(self):
557 """
558 DOH: Lua HTTP Response
559 """
560 name = 'http-lua.doh.tests.powerdns.com.'
561 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
562 query.id = 0
563
564 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
565 self.assertTrue(receivedResponse)
566 self.assertEquals(receivedResponse, b'It works!')
567 self.assertEquals(self._rcode, 200)
568 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
569
570 def testHTTPEarlyResponse(self):
571 """
572 DOH: HTTP Early Response
573 """
574 response_headers = BytesIO()
575 url = self._dohBaseURL + 'coffee'
576 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
577 conn.setopt(pycurl.URL, url)
578 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
579 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
580 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
581 conn.setopt(pycurl.CAINFO, self._caCert)
582 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
583 data = conn.perform_rb()
584 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
585 headers = response_headers.getvalue().decode()
586
587 self.assertEquals(rcode, 418)
588 self.assertEquals(data, b'C0FFEE')
589 self.assertIn('foo: bar', headers)
590 self.assertNotIn(self._customResponseHeader2, headers)
591
592 response_headers = BytesIO()
593 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
594 conn.setopt(pycurl.URL, url)
595 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
596 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
597 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
598 conn.setopt(pycurl.CAINFO, self._caCert)
599 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
600 conn.setopt(pycurl.POST, True)
601 data = ''
602 conn.setopt(pycurl.POSTFIELDS, data)
603
604 data = conn.perform_rb()
605 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
606 headers = response_headers.getvalue().decode()
607 self.assertEquals(rcode, 418)
608 self.assertEquals(data, b'C0FFEE')
609 self.assertIn('foo: bar', headers)
610 self.assertNotIn(self._customResponseHeader2, headers)
611
612 class TestDOHAddingECS(DNSDistDOHTest):
613
614 _serverKey = 'server.key'
615 _serverCert = 'server.chain'
616 _serverName = 'tls.tests.dnsdist.org'
617 _caCert = 'ca.pem'
618 _dohServerPort = 8443
619 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
620 _config_template = """
621 newServer{address="127.0.0.1:%s", useClientSubnet=true}
622 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
623 setECSOverride(true)
624 """
625 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
626
627 def testDOHSimple(self):
628 """
629 DOH with ECS: Simple query
630 """
631 name = 'simple.doh-ecs.tests.powerdns.com.'
632 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
633 query.id = 0
634 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
635 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
636 response = dns.message.make_response(query)
637 rrset = dns.rrset.from_text(name,
638 3600,
639 dns.rdataclass.IN,
640 dns.rdatatype.A,
641 '127.0.0.1')
642 response.answer.append(rrset)
643
644 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
645 self.assertTrue(receivedQuery)
646 self.assertTrue(receivedResponse)
647 expectedQuery.id = receivedQuery.id
648 self.assertEquals(expectedQuery, receivedQuery)
649 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
650 self.assertEquals(response, receivedResponse)
651 self.checkResponseNoEDNS(response, receivedResponse)
652
653 def testDOHExistingEDNS(self):
654 """
655 DOH with ECS: Existing EDNS
656 """
657 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
658 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
659 query.id = 0
660 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
661 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
662 response = dns.message.make_response(query)
663 rrset = dns.rrset.from_text(name,
664 3600,
665 dns.rdataclass.IN,
666 dns.rdatatype.A,
667 '127.0.0.1')
668 response.answer.append(rrset)
669
670 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
671 self.assertTrue(receivedQuery)
672 self.assertTrue(receivedResponse)
673 receivedQuery.id = expectedQuery.id
674 self.assertEquals(expectedQuery, receivedQuery)
675 self.assertEquals(response, receivedResponse)
676 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
677 self.checkResponseEDNSWithoutECS(response, receivedResponse)
678
679 def testDOHExistingECS(self):
680 """
681 DOH with ECS: Existing EDNS Client Subnet
682 """
683 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
684 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
685 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
686 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
687 query.id = 0
688 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
689 response = dns.message.make_response(query)
690 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
691 rrset = dns.rrset.from_text(name,
692 3600,
693 dns.rdataclass.IN,
694 dns.rdatatype.A,
695 '127.0.0.1')
696 response.answer.append(rrset)
697
698 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
699 self.assertTrue(receivedQuery)
700 self.assertTrue(receivedResponse)
701 receivedQuery.id = expectedQuery.id
702 self.assertEquals(expectedQuery, receivedQuery)
703 self.assertEquals(response, receivedResponse)
704 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
705 self.checkResponseEDNSWithECS(response, receivedResponse)
706
707 class TestDOHOverHTTP(DNSDistDOHTest):
708
709 _dohServerPort = 8480
710 _serverName = 'tls.tests.dnsdist.org'
711 _dohBaseURL = ("http://%s:%d/" % (_serverName, _dohServerPort))
712 _config_template = """
713 newServer{address="127.0.0.1:%s"}
714 addDOHLocal("127.0.0.1:%s")
715 """
716 _config_params = ['_testServerPort', '_dohServerPort']
717
718 def testDOHSimple(self):
719 """
720 DOH over HTTP: Simple query
721 """
722 name = 'simple.doh-over-http.tests.powerdns.com.'
723 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
724 query.id = 0
725 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
726 response = dns.message.make_response(query)
727 rrset = dns.rrset.from_text(name,
728 3600,
729 dns.rdataclass.IN,
730 dns.rdatatype.A,
731 '127.0.0.1')
732 response.answer.append(rrset)
733
734 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
735 self.assertTrue(receivedQuery)
736 self.assertTrue(receivedResponse)
737 expectedQuery.id = receivedQuery.id
738 self.assertEquals(expectedQuery, receivedQuery)
739 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
740 self.assertEquals(response, receivedResponse)
741 self.checkResponseNoEDNS(response, receivedResponse)
742
743 def testDOHSimplePOST(self):
744 """
745 DOH over HTTP: Simple POST query
746 """
747 name = 'simple-post.doh-over-http.tests.powerdns.com.'
748 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
749 query.id = 0
750 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
751 expectedQuery.id = 0
752 response = dns.message.make_response(query)
753 rrset = dns.rrset.from_text(name,
754 3600,
755 dns.rdataclass.IN,
756 dns.rdatatype.A,
757 '127.0.0.1')
758 response.answer.append(rrset)
759
760 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
761 self.assertTrue(receivedQuery)
762 self.assertTrue(receivedResponse)
763 receivedQuery.id = expectedQuery.id
764 self.assertEquals(expectedQuery, receivedQuery)
765 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
766 self.assertEquals(response, receivedResponse)
767 self.checkResponseNoEDNS(response, receivedResponse)