8 import clientsubnetoption
9 from dnsdisttests
import DNSDistTest
12 from io
import BytesIO
13 #from hyper import HTTP20Connection
14 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
16 @unittest.skipIf('SKIP_DOH_TESTS' in os
.environ
, 'DNS over HTTPS tests are disabled')
17 class DNSDistDOHTest(DNSDistTest
):
20 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
24 wire
= query
.to_wire()
25 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
26 return baseurl
+ "?dns=" + param
29 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
31 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
33 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
34 "Accept: application/dns-message"])
38 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
39 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
40 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
41 response_headers
= BytesIO()
42 #conn.setopt(pycurl.VERBOSE, True)
43 conn
.setopt(pycurl
.URL
, url
)
44 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
46 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
47 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
49 conn
.setopt(pycurl
.CAINFO
, caFile
)
51 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
52 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
55 cls
._toResponderQueue
.put(response
, True, timeout
)
59 cls
._response
_headers
= ''
60 data
= conn
.perform_rb()
61 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
62 if cls
._rcode
== 200 and not rawResponse
:
63 message
= dns
.message
.from_wire(data
)
67 if useQueue
and not cls
._fromResponderQueue
.empty():
68 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
70 cls
._response
_headers
= response_headers
.getvalue()
71 return (receivedQuery
, message
)
74 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
76 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
77 response_headers
= BytesIO()
78 #conn.setopt(pycurl.VERBOSE, True)
79 conn
.setopt(pycurl
.URL
, url
)
80 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
82 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
83 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
85 conn
.setopt(pycurl
.CAINFO
, caFile
)
87 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
88 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
89 conn
.setopt(pycurl
.POST
, True)
94 conn
.setopt(pycurl
.POSTFIELDS
, data
)
97 cls
._toResponderQueue
.put(response
, True, timeout
)
101 cls
._response
_headers
= ''
102 data
= conn
.perform_rb()
103 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
104 if cls
._rcode
== 200 and not rawResponse
:
105 message
= dns
.message
.from_wire(data
)
109 if useQueue
and not cls
._fromResponderQueue
.empty():
110 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
112 cls
._response
_headers
= response_headers
.getvalue()
113 return (receivedQuery
, message
)
115 def getHeaderValue(self
, name
):
116 for header
in self
._response
_headers
.decode().splitlines(False):
117 values
= header
.split(':')
119 if key
.lower() == name
.lower():
120 return values
[1].strip()
123 def checkHasHeader(self
, name
, value
):
124 got
= self
.getHeaderValue(name
)
125 self
.assertEquals(got
, value
)
127 def checkNoHeader(self
, name
):
128 self
.checkHasHeader(name
, None)
133 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
134 if 'SKIP_DOH_TESTS' in os
.environ
:
135 raise unittest
.SkipTest('DNS over HTTPS tests are disabled')
137 cls
.startResponders()
141 print("Launching tests..")
144 # def openDOHConnection(cls, port, caFile, timeout=2.0):
145 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
146 # sslctx.load_verify_locations(caFile)
147 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
150 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
151 # url = cls.getDOHGetURL(baseurl, query)
154 # cls._toResponderQueue.put(response, True, timeout)
156 # conn.request('GET', url)
159 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
161 # data = conn.get_response()
165 # message = dns.message.from_wire(data)
167 # if useQueue and not cls._fromResponderQueue.empty():
168 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
169 # return (receivedQuery, message)
173 class TestDOH(DNSDistDOHTest
):
175 _serverKey
= 'server.key'
176 _serverCert
= 'server.chain'
177 _serverName
= 'tls.tests.dnsdist.org'
179 _dohServerPort
= 8443
180 _customResponseHeader1
= 'access-control-allow-origin: *'
181 _customResponseHeader2
= 'user-agent: derp'
182 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
183 _config_template
= """
184 newServer{address="127.0.0.1:%s"}
186 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
187 dohFE = getDOHFrontend(0)
188 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
190 addAction("drop.doh.tests.powerdns.com.", DropAction())
191 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
192 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
193 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
194 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
195 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
196 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
197 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
199 function dohHandler(dq)
200 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
201 local foundct = false
202 for key,value in pairs(dq:getHTTPHeaders()) do
203 if key == 'content-type' and value == 'application/dns-message' then
209 dq:setHTTPResponse(200, 'It works!', 'text/plain')
211 return DNSAction.HeaderModify
214 return DNSAction.None
216 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
218 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
220 def testDOHSimple(self
):
224 name
= 'simple.doh.tests.powerdns.com.'
225 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
227 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
229 response
= dns
.message
.make_response(query
)
230 rrset
= dns
.rrset
.from_text(name
,
235 response
.answer
.append(rrset
)
237 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
238 self
.assertTrue(receivedQuery
)
239 self
.assertTrue(receivedResponse
)
240 receivedQuery
.id = expectedQuery
.id
241 self
.assertEquals(expectedQuery
, receivedQuery
)
242 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
243 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
244 self
.assertFalse(('UPPERCASE: VaLuE' in self
._response
_headers
.decode()))
245 self
.assertTrue(('uppercase: VaLuE' in self
._response
_headers
.decode()))
246 self
.assertTrue(('cache-control: max-age=3600' in self
._response
_headers
.decode()))
247 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
248 self
.assertEquals(response
, receivedResponse
)
249 self
.checkHasHeader('cache-control', 'max-age=3600')
251 def testDOHSimplePOST(self
):
253 DOH: Simple POST query
255 name
= 'simple-post.doh.tests.powerdns.com.'
256 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
258 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
260 response
= dns
.message
.make_response(query
)
261 rrset
= dns
.rrset
.from_text(name
,
266 response
.answer
.append(rrset
)
268 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
269 self
.assertTrue(receivedQuery
)
270 self
.assertTrue(receivedResponse
)
271 receivedQuery
.id = expectedQuery
.id
272 self
.assertEquals(expectedQuery
, receivedQuery
)
273 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
274 self
.assertEquals(response
, receivedResponse
)
276 def testDOHExistingEDNS(self
):
280 name
= 'existing-edns.doh.tests.powerdns.com.'
281 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
283 response
= dns
.message
.make_response(query
)
284 rrset
= dns
.rrset
.from_text(name
,
289 response
.answer
.append(rrset
)
291 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
292 self
.assertTrue(receivedQuery
)
293 self
.assertTrue(receivedResponse
)
294 receivedQuery
.id = query
.id
295 self
.assertEquals(query
, receivedQuery
)
296 self
.assertEquals(response
, receivedResponse
)
297 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
298 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
300 def testDOHExistingECS(self
):
302 DOH: Existing EDNS Client Subnet
304 name
= 'existing-ecs.doh.tests.powerdns.com.'
305 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
306 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
307 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
309 response
= dns
.message
.make_response(query
)
310 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
311 rrset
= dns
.rrset
.from_text(name
,
316 response
.answer
.append(rrset
)
318 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
319 self
.assertTrue(receivedQuery
)
320 self
.assertTrue(receivedResponse
)
321 receivedQuery
.id = query
.id
322 self
.assertEquals(query
, receivedQuery
)
323 self
.assertEquals(response
, receivedResponse
)
324 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
325 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
327 def testDropped(self
):
331 name
= 'drop.doh.tests.powerdns.com.'
332 query
= dns
.message
.make_query(name
, 'A', 'IN')
333 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
334 self
.assertEquals(receivedResponse
, None)
336 def testRefused(self
):
340 name
= 'refused.doh.tests.powerdns.com.'
341 query
= dns
.message
.make_query(name
, 'A', 'IN')
343 query
.flags
&= ~dns
.flags
.RD
344 expectedResponse
= dns
.message
.make_response(query
)
345 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
347 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
348 self
.assertEquals(receivedResponse
, expectedResponse
)
354 name
= 'spoof.doh.tests.powerdns.com.'
355 query
= dns
.message
.make_query(name
, 'A', 'IN')
357 query
.flags
&= ~dns
.flags
.RD
358 expectedResponse
= dns
.message
.make_response(query
)
359 rrset
= dns
.rrset
.from_text(name
,
364 expectedResponse
.answer
.append(rrset
)
366 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
367 self
.assertEquals(receivedResponse
, expectedResponse
)
369 def testDOHInvalid(self
):
373 name
= 'invalid.doh.tests.powerdns.com.'
374 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
376 # first an invalid query
377 invalidQuery
= invalidQuery
.to_wire()
378 invalidQuery
= invalidQuery
[:-5]
379 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
380 self
.assertEquals(receivedResponse
, None)
382 # and now a valid one
383 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
385 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
387 response
= dns
.message
.make_response(query
)
388 rrset
= dns
.rrset
.from_text(name
,
393 response
.answer
.append(rrset
)
394 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
395 self
.assertTrue(receivedQuery
)
396 self
.assertTrue(receivedResponse
)
397 receivedQuery
.id = expectedQuery
.id
398 self
.assertEquals(expectedQuery
, receivedQuery
)
399 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
400 self
.assertEquals(response
, receivedResponse
)
402 def testDOHWithoutQuery(self
):
406 name
= 'empty-get.doh.tests.powerdns.com.'
407 url
= self
._dohBaseURL
408 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
409 conn
.setopt(pycurl
.URL
, url
)
410 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
411 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
412 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
413 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
414 data
= conn
.perform_rb()
415 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
416 self
.assertEquals(rcode
, 400)
418 def testDOHEmptyPOST(self
):
420 DOH: Empty POST query
422 name
= 'empty-post.doh.tests.powerdns.com.'
424 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
425 self
.assertEquals(receivedResponse
, None)
427 # and now a valid one
428 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
430 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
432 response
= dns
.message
.make_response(query
)
433 rrset
= dns
.rrset
.from_text(name
,
438 response
.answer
.append(rrset
)
439 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
440 self
.assertTrue(receivedQuery
)
441 self
.assertTrue(receivedResponse
)
442 receivedQuery
.id = expectedQuery
.id
443 self
.assertEquals(expectedQuery
, receivedQuery
)
444 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
445 self
.assertEquals(response
, receivedResponse
)
447 def testHeaderRule(self
):
451 name
= 'header-rule.doh.tests.powerdns.com.'
452 query
= dns
.message
.make_query(name
, 'A', 'IN')
454 query
.flags
&= ~dns
.flags
.RD
455 expectedResponse
= dns
.message
.make_response(query
)
456 rrset
= dns
.rrset
.from_text(name
,
461 expectedResponse
.answer
.append(rrset
)
463 # this header should match
464 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
465 self
.assertEquals(receivedResponse
, expectedResponse
)
467 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
468 expectedQuery
.flags
&= ~dns
.flags
.RD
470 response
= dns
.message
.make_response(query
)
471 rrset
= dns
.rrset
.from_text(name
,
476 response
.answer
.append(rrset
)
478 # this content of the header should NOT match
479 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
480 self
.assertTrue(receivedQuery
)
481 self
.assertTrue(receivedResponse
)
482 receivedQuery
.id = expectedQuery
.id
483 self
.assertEquals(expectedQuery
, receivedQuery
)
484 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
485 self
.assertEquals(response
, receivedResponse
)
487 def testHTTPPath(self
):
491 name
= 'http-path.doh.tests.powerdns.com.'
492 query
= dns
.message
.make_query(name
, 'A', 'IN')
494 query
.flags
&= ~dns
.flags
.RD
495 expectedResponse
= dns
.message
.make_response(query
)
496 rrset
= dns
.rrset
.from_text(name
,
501 expectedResponse
.answer
.append(rrset
)
503 # this path should match
504 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
505 self
.assertEquals(receivedResponse
, expectedResponse
)
507 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
509 expectedQuery
.flags
&= ~dns
.flags
.RD
510 response
= dns
.message
.make_response(query
)
511 rrset
= dns
.rrset
.from_text(name
,
516 response
.answer
.append(rrset
)
518 # this path should NOT match
519 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
520 self
.assertTrue(receivedQuery
)
521 self
.assertTrue(receivedResponse
)
522 receivedQuery
.id = expectedQuery
.id
523 self
.assertEquals(expectedQuery
, receivedQuery
)
524 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
525 self
.assertEquals(response
, receivedResponse
)
527 def testHTTPPathRegex(self
):
531 name
= 'http-path-regex.doh.tests.powerdns.com.'
532 query
= dns
.message
.make_query(name
, 'A', 'IN')
534 query
.flags
&= ~dns
.flags
.RD
535 expectedResponse
= dns
.message
.make_response(query
)
536 rrset
= dns
.rrset
.from_text(name
,
541 expectedResponse
.answer
.append(rrset
)
543 # this path should match
544 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
545 self
.assertEquals(receivedResponse
, expectedResponse
)
547 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
549 expectedQuery
.flags
&= ~dns
.flags
.RD
550 response
= dns
.message
.make_response(query
)
551 rrset
= dns
.rrset
.from_text(name
,
556 response
.answer
.append(rrset
)
558 # this path should NOT match
559 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
560 self
.assertTrue(receivedQuery
)
561 self
.assertTrue(receivedResponse
)
562 receivedQuery
.id = expectedQuery
.id
563 self
.assertEquals(expectedQuery
, receivedQuery
)
564 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
565 self
.assertEquals(response
, receivedResponse
)
567 def testHTTPStatusAction200(self
):
569 DOH: HTTPStatusAction 200 OK
571 name
= 'http-status-action.doh.tests.powerdns.com.'
572 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
575 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
576 self
.assertTrue(receivedResponse
)
577 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
578 self
.assertEquals(self
._rcode
, 200)
579 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
581 def testHTTPStatusAction307(self
):
583 DOH: HTTPStatusAction 307
585 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
586 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
589 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
590 self
.assertTrue(receivedResponse
)
591 self
.assertEquals(self
._rcode
, 307)
592 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
594 def testHTTPLuaResponse(self
):
596 DOH: Lua HTTP Response
598 name
= 'http-lua.doh.tests.powerdns.com.'
599 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
602 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
603 self
.assertTrue(receivedResponse
)
604 self
.assertEquals(receivedResponse
, b
'It works!')
605 self
.assertEquals(self
._rcode
, 200)
606 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
608 def testHTTPEarlyResponse(self
):
610 DOH: HTTP Early Response
612 response_headers
= BytesIO()
613 url
= self
._dohBaseURL
+ 'coffee'
614 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
615 conn
.setopt(pycurl
.URL
, url
)
616 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
617 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
618 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
619 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
620 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
621 data
= conn
.perform_rb()
622 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
623 headers
= response_headers
.getvalue().decode()
625 self
.assertEquals(rcode
, 418)
626 self
.assertEquals(data
, b
'C0FFEE')
627 self
.assertIn('foo: bar', headers
)
628 self
.assertNotIn(self
._customResponseHeader
2, headers
)
630 response_headers
= BytesIO()
631 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
632 conn
.setopt(pycurl
.URL
, url
)
633 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
634 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
635 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
636 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
637 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
638 conn
.setopt(pycurl
.POST
, True)
640 conn
.setopt(pycurl
.POSTFIELDS
, data
)
642 data
= conn
.perform_rb()
643 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
644 headers
= response_headers
.getvalue().decode()
645 self
.assertEquals(rcode
, 418)
646 self
.assertEquals(data
, b
'C0FFEE')
647 self
.assertIn('foo: bar', headers
)
648 self
.assertNotIn(self
._customResponseHeader
2, headers
)
650 class TestDOHAddingECS(DNSDistDOHTest
):
652 _serverKey
= 'server.key'
653 _serverCert
= 'server.chain'
654 _serverName
= 'tls.tests.dnsdist.org'
656 _dohServerPort
= 8443
657 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
658 _config_template
= """
659 newServer{address="127.0.0.1:%s", useClientSubnet=true}
660 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
663 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
665 def testDOHSimple(self
):
667 DOH with ECS: Simple query
669 name
= 'simple.doh-ecs.tests.powerdns.com.'
670 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
672 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
673 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
674 response
= dns
.message
.make_response(query
)
675 rrset
= dns
.rrset
.from_text(name
,
680 response
.answer
.append(rrset
)
682 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
683 self
.assertTrue(receivedQuery
)
684 self
.assertTrue(receivedResponse
)
685 expectedQuery
.id = receivedQuery
.id
686 self
.assertEquals(expectedQuery
, receivedQuery
)
687 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
688 self
.assertEquals(response
, receivedResponse
)
689 self
.checkResponseNoEDNS(response
, receivedResponse
)
691 def testDOHExistingEDNS(self
):
693 DOH with ECS: Existing EDNS
695 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
696 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
698 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
699 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
700 response
= dns
.message
.make_response(query
)
701 rrset
= dns
.rrset
.from_text(name
,
706 response
.answer
.append(rrset
)
708 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
709 self
.assertTrue(receivedQuery
)
710 self
.assertTrue(receivedResponse
)
711 receivedQuery
.id = expectedQuery
.id
712 self
.assertEquals(expectedQuery
, receivedQuery
)
713 self
.assertEquals(response
, receivedResponse
)
714 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
715 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
717 def testDOHExistingECS(self
):
719 DOH with ECS: Existing EDNS Client Subnet
721 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
722 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
723 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
724 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
726 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
727 response
= dns
.message
.make_response(query
)
728 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
729 rrset
= dns
.rrset
.from_text(name
,
734 response
.answer
.append(rrset
)
736 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
737 self
.assertTrue(receivedQuery
)
738 self
.assertTrue(receivedResponse
)
739 receivedQuery
.id = expectedQuery
.id
740 self
.assertEquals(expectedQuery
, receivedQuery
)
741 self
.assertEquals(response
, receivedResponse
)
742 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
743 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
745 class TestDOHOverHTTP(DNSDistDOHTest
):
747 _dohServerPort
= 8480
748 _serverName
= 'tls.tests.dnsdist.org'
749 _dohBaseURL
= ("http://%s:%d/" % (_serverName
, _dohServerPort
))
750 _config_template
= """
751 newServer{address="127.0.0.1:%s"}
752 addDOHLocal("127.0.0.1:%s")
754 _config_params
= ['_testServerPort', '_dohServerPort']
755 _checkConfigExpectedOutput
= b
"""No certificate provided for DoH endpoint 127.0.0.1:8480, running in DNS over HTTP mode instead of DNS over HTTPS
756 Configuration 'configs/dnsdist_TestDOHOverHTTP.conf' OK!
759 def testDOHSimple(self
):
761 DOH over HTTP: Simple query
763 name
= 'simple.doh-over-http.tests.powerdns.com.'
764 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
766 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
767 response
= dns
.message
.make_response(query
)
768 rrset
= dns
.rrset
.from_text(name
,
773 response
.answer
.append(rrset
)
775 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
776 self
.assertTrue(receivedQuery
)
777 self
.assertTrue(receivedResponse
)
778 expectedQuery
.id = receivedQuery
.id
779 self
.assertEquals(expectedQuery
, receivedQuery
)
780 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
781 self
.assertEquals(response
, receivedResponse
)
782 self
.checkResponseNoEDNS(response
, receivedResponse
)
784 def testDOHSimplePOST(self
):
786 DOH over HTTP: Simple POST query
788 name
= 'simple-post.doh-over-http.tests.powerdns.com.'
789 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
791 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
793 response
= dns
.message
.make_response(query
)
794 rrset
= dns
.rrset
.from_text(name
,
799 response
.answer
.append(rrset
)
801 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
802 self
.assertTrue(receivedQuery
)
803 self
.assertTrue(receivedResponse
)
804 receivedQuery
.id = expectedQuery
.id
805 self
.assertEquals(expectedQuery
, receivedQuery
)
806 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
807 self
.assertEquals(response
, receivedResponse
)
808 self
.checkResponseNoEDNS(response
, receivedResponse
)
810 class TestDOHWithCache(DNSDistDOHTest
):
812 _serverKey
= 'server.key'
813 _serverCert
= 'server.chain'
814 _serverName
= 'tls.tests.dnsdist.org'
816 _dohServerPort
= 8443
817 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
818 _config_template
= """
819 newServer{address="127.0.0.1:%s"}
821 addDOHLocal("127.0.0.1:%s", "%s", "%s")
823 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
824 getPool(""):setCache(pc)
826 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
828 def testDOHCacheLargeAnswer(self
):
830 DOH with cache: Check that we can cache (and retrieve) large answers
833 name
= 'large.doh-with-cache.tests.powerdns.com.'
834 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
836 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
838 response
= dns
.message
.make_response(query
)
839 # we prepare a large answer
843 content
= content
+ ', '
844 content
= content
+ (str(i
)*50)
846 content
= content
+ 'A'*40
848 rrset
= dns
.rrset
.from_text(name
,
853 response
.answer
.append(rrset
)
854 self
.assertEquals(len(response
.to_wire()), 4096)
856 # first query to fill the cache
857 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
858 self
.assertTrue(receivedQuery
)
859 self
.assertTrue(receivedResponse
)
860 receivedQuery
.id = expectedQuery
.id
861 self
.assertEquals(expectedQuery
, receivedQuery
)
862 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
863 self
.assertEquals(response
, receivedResponse
)
864 self
.checkHasHeader('cache-control', 'max-age=3600')
866 for _
in range(numberOfQueries
):
867 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False)
868 self
.assertEquals(receivedResponse
, response
)
869 self
.checkHasHeader('cache-control', 'max-age=' + str(receivedResponse
.answer
[0].ttl
))
873 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False)
874 self
.assertEquals(receivedResponse
, response
)
875 self
.checkHasHeader('cache-control', 'max-age=' + str(receivedResponse
.answer
[0].ttl
))
877 class TestDOHWithoutCacheControl(DNSDistDOHTest
):
879 _serverKey
= 'server.key'
880 _serverCert
= 'server.chain'
881 _serverName
= 'tls.tests.dnsdist.org'
883 _dohServerPort
= 8443
884 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
885 _config_template
= """
886 newServer{address="127.0.0.1:%s"}
888 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {sendCacheControlHeaders=false})
890 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
892 def testDOHSimple(self
):
894 DOH without cache-control
896 name
= 'simple.doh.tests.powerdns.com.'
897 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
899 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
901 response
= dns
.message
.make_response(query
)
902 rrset
= dns
.rrset
.from_text(name
,
907 response
.answer
.append(rrset
)
909 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
910 self
.assertTrue(receivedQuery
)
911 self
.assertTrue(receivedResponse
)
912 receivedQuery
.id = expectedQuery
.id
913 self
.assertEquals(expectedQuery
, receivedQuery
)
914 self
.checkNoHeader('cache-control')
915 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
916 self
.assertEquals(response
, receivedResponse
)