]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #7984 from qvr/slave-renotify-metadata
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8 from io import BytesIO
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[]):
34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
36 response_headers = BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn.setopt(pycurl.URL, url)
39 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
40 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
41 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
42 conn.setopt(pycurl.HTTPHEADER, customHeaders)
43 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
44 if caFile:
45 conn.setopt(pycurl.CAINFO, caFile)
46
47 if response:
48 cls._toResponderQueue.put(response, True, timeout)
49
50 receivedQuery = None
51 message = None
52 cls._response_headers = ''
53 data = conn.perform_rb()
54 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
55 if cls._rcode == 200 and not rawResponse:
56 message = dns.message.from_wire(data)
57 elif rawResponse:
58 message = data
59
60 if useQueue and not cls._fromResponderQueue.empty():
61 receivedQuery = cls._fromResponderQueue.get(True, timeout)
62
63 cls._response_headers = response_headers.getvalue()
64 return (receivedQuery, message)
65
66 @classmethod
67 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[]):
68 url = baseurl
69 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
70 response_headers = BytesIO()
71 #conn.setopt(pycurl.VERBOSE, True)
72 conn.setopt(pycurl.URL, url)
73 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
74 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
75 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
76 conn.setopt(pycurl.HTTPHEADER, customHeaders)
77 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
78 conn.setopt(pycurl.POST, True)
79 data = query
80 if not rawQuery:
81 data = data.to_wire()
82
83 conn.setopt(pycurl.POSTFIELDS, data)
84
85 if caFile:
86 conn.setopt(pycurl.CAINFO, caFile)
87
88 if response:
89 cls._toResponderQueue.put(response, True, timeout)
90
91 receivedQuery = None
92 message = None
93 cls._response_headers = ''
94 data = conn.perform_rb()
95 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
96 if cls._rcode == 200 and not rawResponse:
97 message = dns.message.from_wire(data)
98 elif rawResponse:
99 message = data
100
101 if useQueue and not cls._fromResponderQueue.empty():
102 receivedQuery = cls._fromResponderQueue.get(True, timeout)
103
104 cls._response_headers = response_headers.getvalue()
105 return (receivedQuery, message)
106
107 # @classmethod
108 # def openDOHConnection(cls, port, caFile, timeout=2.0):
109 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
110 # sslctx.load_verify_locations(caFile)
111 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
112
113 # @classmethod
114 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
115 # url = cls.getDOHGetURL(baseurl, query)
116
117 # if response:
118 # cls._toResponderQueue.put(response, True, timeout)
119
120 # conn.request('GET', url)
121
122 # @classmethod
123 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
124 # message = None
125 # data = conn.get_response()
126 # if data:
127 # data = data.read()
128 # if data:
129 # message = dns.message.from_wire(data)
130
131 # if useQueue and not cls._fromResponderQueue.empty():
132 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
133 # return (receivedQuery, message)
134 # else:
135 # return message
136
137 class TestDOH(DNSDistDOHTest):
138
139 _serverKey = 'server.key'
140 _serverCert = 'server.chain'
141 _serverName = 'tls.tests.dnsdist.org'
142 _caCert = 'ca.pem'
143 _dohServerPort = 8443
144 _customResponseHeader1 = 'access-control-allow-origin: *'
145 _customResponseHeader2 = 'user-agent: derp'
146 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
147 _config_template = """
148 newServer{address="127.0.0.1:%s"}
149
150 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
151 dohFE = getDOHFrontend(0)
152 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['foo']='bar'})})
153
154 addAction("drop.doh.tests.powerdns.com.", DropAction())
155 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
156 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
157 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
158 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
159 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
160 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
161 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
162
163 function dohHandler(dq)
164 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
165 local foundct = false
166 for key,value in pairs(dq:getHTTPHeaders()) do
167 if key == 'content-type' and value == 'application/dns-message' then
168 foundct = true
169 break
170 end
171 end
172 if foundct then
173 dq:setHTTPResponse(200, 'It works!', 'text/plain')
174 dq.dh:setQR(true)
175 return DNSAction.HeaderModify
176 end
177 end
178 return DNSAction.None
179 end
180 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
181 """
182 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
183
184 def testDOHSimple(self):
185 """
186 DOH: Simple query
187 """
188 name = 'simple.doh.tests.powerdns.com.'
189 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
190 query.id = 0
191 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
192 expectedQuery.id = 0
193 response = dns.message.make_response(query)
194 rrset = dns.rrset.from_text(name,
195 3600,
196 dns.rdataclass.IN,
197 dns.rdatatype.A,
198 '127.0.0.1')
199 response.answer.append(rrset)
200
201 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
202 self.assertTrue(receivedQuery)
203 self.assertTrue(receivedResponse)
204 receivedQuery.id = expectedQuery.id
205 self.assertEquals(expectedQuery, receivedQuery)
206 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
207 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
208 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
209 self.assertEquals(response, receivedResponse)
210
211 def testDOHSimplePOST(self):
212 """
213 DOH: Simple POST query
214 """
215 name = 'simple-post.doh.tests.powerdns.com.'
216 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
217 query.id = 0
218 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
219 expectedQuery.id = 0
220 response = dns.message.make_response(query)
221 rrset = dns.rrset.from_text(name,
222 3600,
223 dns.rdataclass.IN,
224 dns.rdatatype.A,
225 '127.0.0.1')
226 response.answer.append(rrset)
227
228 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
229 self.assertTrue(receivedQuery)
230 self.assertTrue(receivedResponse)
231 receivedQuery.id = expectedQuery.id
232 self.assertEquals(expectedQuery, receivedQuery)
233 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
234 self.assertEquals(response, receivedResponse)
235
236 def testDOHExistingEDNS(self):
237 """
238 DOH: Existing EDNS
239 """
240 name = 'existing-edns.doh.tests.powerdns.com.'
241 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
242 query.id = 0
243 response = dns.message.make_response(query)
244 rrset = dns.rrset.from_text(name,
245 3600,
246 dns.rdataclass.IN,
247 dns.rdatatype.A,
248 '127.0.0.1')
249 response.answer.append(rrset)
250
251 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
252 self.assertTrue(receivedQuery)
253 self.assertTrue(receivedResponse)
254 receivedQuery.id = query.id
255 self.assertEquals(query, receivedQuery)
256 self.assertEquals(response, receivedResponse)
257 self.checkQueryEDNSWithoutECS(query, receivedQuery)
258 self.checkResponseEDNSWithoutECS(response, receivedResponse)
259
260 def testDOHExistingECS(self):
261 """
262 DOH: Existing EDNS Client Subnet
263 """
264 name = 'existing-ecs.doh.tests.powerdns.com.'
265 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
266 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
267 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
268 query.id = 0
269 response = dns.message.make_response(query)
270 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
271 rrset = dns.rrset.from_text(name,
272 3600,
273 dns.rdataclass.IN,
274 dns.rdatatype.A,
275 '127.0.0.1')
276 response.answer.append(rrset)
277
278 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
279 self.assertTrue(receivedQuery)
280 self.assertTrue(receivedResponse)
281 receivedQuery.id = query.id
282 self.assertEquals(query, receivedQuery)
283 self.assertEquals(response, receivedResponse)
284 self.checkQueryEDNSWithECS(query, receivedQuery)
285 self.checkResponseEDNSWithECS(response, receivedResponse)
286
287 def testDropped(self):
288 """
289 DOH: Dropped query
290 """
291 name = 'drop.doh.tests.powerdns.com.'
292 query = dns.message.make_query(name, 'A', 'IN')
293 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
294 self.assertEquals(receivedResponse, None)
295
296 def testRefused(self):
297 """
298 DOH: Refused
299 """
300 name = 'refused.doh.tests.powerdns.com.'
301 query = dns.message.make_query(name, 'A', 'IN')
302 query.id = 0
303 expectedResponse = dns.message.make_response(query)
304 expectedResponse.set_rcode(dns.rcode.REFUSED)
305
306 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
307 self.assertEquals(receivedResponse, expectedResponse)
308
309 def testSpoof(self):
310 """
311 DOH: Spoofed
312 """
313 name = 'spoof.doh.tests.powerdns.com.'
314 query = dns.message.make_query(name, 'A', 'IN')
315 query.id = 0
316 query.flags &= ~dns.flags.RD
317 expectedResponse = dns.message.make_response(query)
318 rrset = dns.rrset.from_text(name,
319 3600,
320 dns.rdataclass.IN,
321 dns.rdatatype.A,
322 '1.2.3.4')
323 expectedResponse.answer.append(rrset)
324
325 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
326 self.assertEquals(receivedResponse, expectedResponse)
327
328 def testDOHInvalid(self):
329 """
330 DOH: Invalid query
331 """
332 name = 'invalid.doh.tests.powerdns.com.'
333 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
334 invalidQuery.id = 0
335 # first an invalid query
336 invalidQuery = invalidQuery.to_wire()
337 invalidQuery = invalidQuery[:-5]
338 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
339 self.assertEquals(receivedResponse, None)
340
341 # and now a valid one
342 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
343 query.id = 0
344 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
345 expectedQuery.id = 0
346 response = dns.message.make_response(query)
347 rrset = dns.rrset.from_text(name,
348 3600,
349 dns.rdataclass.IN,
350 dns.rdatatype.A,
351 '127.0.0.1')
352 response.answer.append(rrset)
353 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
354 self.assertTrue(receivedQuery)
355 self.assertTrue(receivedResponse)
356 receivedQuery.id = expectedQuery.id
357 self.assertEquals(expectedQuery, receivedQuery)
358 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
359 self.assertEquals(response, receivedResponse)
360
361 def testDOHWithoutQuery(self):
362 """
363 DOH: Empty GET query
364 """
365 name = 'empty-get.doh.tests.powerdns.com.'
366 url = self._dohBaseURL
367 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
368 conn.setopt(pycurl.URL, url)
369 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
370 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
371 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
372 conn.setopt(pycurl.CAINFO, self._caCert)
373 data = conn.perform_rb()
374 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
375 self.assertEquals(rcode, 400)
376
377 def testDOHEmptyPOST(self):
378 """
379 DOH: Empty POST query
380 """
381 name = 'empty-post.doh.tests.powerdns.com.'
382
383 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
384 self.assertEquals(receivedResponse, None)
385
386 # and now a valid one
387 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
388 query.id = 0
389 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
390 expectedQuery.id = 0
391 response = dns.message.make_response(query)
392 rrset = dns.rrset.from_text(name,
393 3600,
394 dns.rdataclass.IN,
395 dns.rdatatype.A,
396 '127.0.0.1')
397 response.answer.append(rrset)
398 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
399 self.assertTrue(receivedQuery)
400 self.assertTrue(receivedResponse)
401 receivedQuery.id = expectedQuery.id
402 self.assertEquals(expectedQuery, receivedQuery)
403 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
404 self.assertEquals(response, receivedResponse)
405
406 def testHeaderRule(self):
407 """
408 DOH: HeaderRule
409 """
410 name = 'header-rule.doh.tests.powerdns.com.'
411 query = dns.message.make_query(name, 'A', 'IN')
412 query.id = 0
413 query.flags &= ~dns.flags.RD
414 expectedResponse = dns.message.make_response(query)
415 rrset = dns.rrset.from_text(name,
416 3600,
417 dns.rdataclass.IN,
418 dns.rdatatype.A,
419 '2.3.4.5')
420 expectedResponse.answer.append(rrset)
421
422 # this header should match
423 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
424 self.assertEquals(receivedResponse, expectedResponse)
425
426 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
427 expectedQuery.flags &= ~dns.flags.RD
428 expectedQuery.id = 0
429 response = dns.message.make_response(query)
430 rrset = dns.rrset.from_text(name,
431 3600,
432 dns.rdataclass.IN,
433 dns.rdatatype.A,
434 '127.0.0.1')
435 response.answer.append(rrset)
436
437 # this content of the header should NOT match
438 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
439 self.assertTrue(receivedQuery)
440 self.assertTrue(receivedResponse)
441 receivedQuery.id = expectedQuery.id
442 self.assertEquals(expectedQuery, receivedQuery)
443 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
444 self.assertEquals(response, receivedResponse)
445
446 def testHTTPPath(self):
447 """
448 DOH: HTTPPath
449 """
450 name = 'http-path.doh.tests.powerdns.com.'
451 query = dns.message.make_query(name, 'A', 'IN')
452 query.id = 0
453 query.flags &= ~dns.flags.RD
454 expectedResponse = dns.message.make_response(query)
455 rrset = dns.rrset.from_text(name,
456 3600,
457 dns.rdataclass.IN,
458 dns.rdatatype.A,
459 '3.4.5.6')
460 expectedResponse.answer.append(rrset)
461
462 # this path should match
463 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
464 self.assertEquals(receivedResponse, expectedResponse)
465
466 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
467 expectedQuery.id = 0
468 expectedQuery.flags &= ~dns.flags.RD
469 response = dns.message.make_response(query)
470 rrset = dns.rrset.from_text(name,
471 3600,
472 dns.rdataclass.IN,
473 dns.rdatatype.A,
474 '127.0.0.1')
475 response.answer.append(rrset)
476
477 # this path should NOT match
478 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
479 self.assertTrue(receivedQuery)
480 self.assertTrue(receivedResponse)
481 receivedQuery.id = expectedQuery.id
482 self.assertEquals(expectedQuery, receivedQuery)
483 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
484 self.assertEquals(response, receivedResponse)
485
486 def testHTTPPathRegex(self):
487 """
488 DOH: HTTPPathRegex
489 """
490 name = 'http-path-regex.doh.tests.powerdns.com.'
491 query = dns.message.make_query(name, 'A', 'IN')
492 query.id = 0
493 query.flags &= ~dns.flags.RD
494 expectedResponse = dns.message.make_response(query)
495 rrset = dns.rrset.from_text(name,
496 3600,
497 dns.rdataclass.IN,
498 dns.rdatatype.A,
499 '6.7.8.9')
500 expectedResponse.answer.append(rrset)
501
502 # this path should match
503 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS-999', caFile=self._caCert, query=query, response=None, useQueue=False)
504 self.assertEquals(receivedResponse, expectedResponse)
505
506 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
507 expectedQuery.id = 0
508 expectedQuery.flags &= ~dns.flags.RD
509 response = dns.message.make_response(query)
510 rrset = dns.rrset.from_text(name,
511 3600,
512 dns.rdataclass.IN,
513 dns.rdatatype.A,
514 '127.0.0.1')
515 response.answer.append(rrset)
516
517 # this path should NOT match
518 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
519 self.assertTrue(receivedQuery)
520 self.assertTrue(receivedResponse)
521 receivedQuery.id = expectedQuery.id
522 self.assertEquals(expectedQuery, receivedQuery)
523 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
524 self.assertEquals(response, receivedResponse)
525
526 def testHTTPStatusAction200(self):
527 """
528 DOH: HTTPStatusAction 200 OK
529 """
530 name = 'http-status-action.doh.tests.powerdns.com.'
531 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
532 query.id = 0
533
534 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
535 self.assertTrue(receivedResponse)
536 self.assertEquals(receivedResponse, b'Plaintext answer')
537 self.assertEquals(self._rcode, 200)
538 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
539
540 def testHTTPStatusAction307(self):
541 """
542 DOH: HTTPStatusAction 307
543 """
544 name = 'http-status-action-redirect.doh.tests.powerdns.com.'
545 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
546 query.id = 0
547
548 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
549 self.assertTrue(receivedResponse)
550 self.assertEquals(self._rcode, 307)
551 self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
552
553 def testHTTPLuaResponse(self):
554 """
555 DOH: Lua HTTP Response
556 """
557 name = 'http-lua.doh.tests.powerdns.com.'
558 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
559 query.id = 0
560
561 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
562 self.assertTrue(receivedResponse)
563 self.assertEquals(receivedResponse, b'It works!')
564 self.assertEquals(self._rcode, 200)
565 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
566
567 def testHTTPEarlyResponse(self):
568 """
569 DOH: HTTP Early Response
570 """
571 response_headers = BytesIO()
572 url = self._dohBaseURL + 'coffee'
573 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
574 conn.setopt(pycurl.URL, url)
575 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
576 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
577 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
578 conn.setopt(pycurl.CAINFO, self._caCert)
579 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
580 data = conn.perform_rb()
581 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
582 headers = response_headers.getvalue().decode()
583
584 self.assertEquals(rcode, 418)
585 self.assertEquals(data, b'C0FFEE')
586 self.assertIn('foo: bar', headers)
587 self.assertNotIn(self._customResponseHeader2, headers)
588
589 response_headers = BytesIO()
590 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
591 conn.setopt(pycurl.URL, url)
592 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
593 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
594 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
595 conn.setopt(pycurl.CAINFO, self._caCert)
596 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
597 conn.setopt(pycurl.POST, True)
598 data = ''
599 conn.setopt(pycurl.POSTFIELDS, data)
600
601 data = conn.perform_rb()
602 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
603 headers = response_headers.getvalue().decode()
604 self.assertEquals(rcode, 418)
605 self.assertEquals(data, b'C0FFEE')
606 self.assertIn('foo: bar', headers)
607 self.assertNotIn(self._customResponseHeader2, headers)
608
609 class TestDOHAddingECS(DNSDistDOHTest):
610
611 _serverKey = 'server.key'
612 _serverCert = 'server.chain'
613 _serverName = 'tls.tests.dnsdist.org'
614 _caCert = 'ca.pem'
615 _dohServerPort = 8443
616 _serverName = 'tls.tests.dnsdist.org'
617 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
618 _config_template = """
619 newServer{address="127.0.0.1:%s", useClientSubnet=true}
620 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
621 setECSOverride(true)
622 """
623 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
624
625 def testDOHSimple(self):
626 """
627 DOH with ECS: Simple query
628 """
629 name = 'simple.doh-ecs.tests.powerdns.com.'
630 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
631 query.id = 0
632 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
633 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
634 response = dns.message.make_response(query)
635 rrset = dns.rrset.from_text(name,
636 3600,
637 dns.rdataclass.IN,
638 dns.rdatatype.A,
639 '127.0.0.1')
640 response.answer.append(rrset)
641
642 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
643 self.assertTrue(receivedQuery)
644 self.assertTrue(receivedResponse)
645 expectedQuery.id = receivedQuery.id
646 self.assertEquals(expectedQuery, receivedQuery)
647 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
648 self.assertEquals(response, receivedResponse)
649 self.checkResponseNoEDNS(response, receivedResponse)
650
651 def testDOHExistingEDNS(self):
652 """
653 DOH with ECS: Existing EDNS
654 """
655 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
656 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
657 query.id = 0
658 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
659 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
660 response = dns.message.make_response(query)
661 rrset = dns.rrset.from_text(name,
662 3600,
663 dns.rdataclass.IN,
664 dns.rdatatype.A,
665 '127.0.0.1')
666 response.answer.append(rrset)
667
668 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
669 self.assertTrue(receivedQuery)
670 self.assertTrue(receivedResponse)
671 receivedQuery.id = expectedQuery.id
672 self.assertEquals(expectedQuery, receivedQuery)
673 self.assertEquals(response, receivedResponse)
674 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
675 self.checkResponseEDNSWithoutECS(response, receivedResponse)
676
677 def testDOHExistingECS(self):
678 """
679 DOH with ECS: Existing EDNS Client Subnet
680 """
681 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
682 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
683 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
684 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
685 query.id = 0
686 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
687 response = dns.message.make_response(query)
688 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
689 rrset = dns.rrset.from_text(name,
690 3600,
691 dns.rdataclass.IN,
692 dns.rdatatype.A,
693 '127.0.0.1')
694 response.answer.append(rrset)
695
696 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
697 self.assertTrue(receivedQuery)
698 self.assertTrue(receivedResponse)
699 receivedQuery.id = expectedQuery.id
700 self.assertEquals(expectedQuery, receivedQuery)
701 self.assertEquals(response, receivedResponse)
702 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
703 self.checkResponseEDNSWithECS(response, receivedResponse)