8 import clientsubnetoption
9 from dnsdisttests
import DNSDistTest
12 from io
import BytesIO
13 #from hyper import HTTP20Connection
14 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
16 @unittest.skipIf('SKIP_DOH_TESTS' in os
.environ
, 'DNS over HTTPS tests are disabled')
17 class DNSDistDOHTest(DNSDistTest
):
20 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
24 wire
= query
.to_wire()
25 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
26 return baseurl
+ "?dns=" + param
29 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
31 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
33 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
34 "Accept: application/dns-message"])
38 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
39 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
40 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
41 response_headers
= BytesIO()
42 #conn.setopt(pycurl.VERBOSE, True)
43 conn
.setopt(pycurl
.URL
, url
)
44 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
46 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
47 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
49 conn
.setopt(pycurl
.CAINFO
, caFile
)
51 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
52 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
55 cls
._toResponderQueue
.put(response
, True, timeout
)
59 cls
._response
_headers
= ''
60 data
= conn
.perform_rb()
61 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
62 if cls
._rcode
== 200 and not rawResponse
:
63 message
= dns
.message
.from_wire(data
)
67 if useQueue
and not cls
._fromResponderQueue
.empty():
68 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
70 cls
._response
_headers
= response_headers
.getvalue()
71 return (receivedQuery
, message
)
74 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
76 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
77 response_headers
= BytesIO()
78 #conn.setopt(pycurl.VERBOSE, True)
79 conn
.setopt(pycurl
.URL
, url
)
80 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
82 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
83 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
85 conn
.setopt(pycurl
.CAINFO
, caFile
)
87 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
88 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
89 conn
.setopt(pycurl
.POST
, True)
94 conn
.setopt(pycurl
.POSTFIELDS
, data
)
97 cls
._toResponderQueue
.put(response
, True, timeout
)
101 cls
._response
_headers
= ''
102 data
= conn
.perform_rb()
103 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
104 if cls
._rcode
== 200 and not rawResponse
:
105 message
= dns
.message
.from_wire(data
)
109 if useQueue
and not cls
._fromResponderQueue
.empty():
110 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
112 cls
._response
_headers
= response_headers
.getvalue()
113 return (receivedQuery
, message
)
115 def getHeaderValue(self
, name
):
116 for header
in self
._response
_headers
.decode().splitlines(False):
117 values
= header
.split(':')
119 if key
.lower() == name
.lower():
120 return values
[1].strip()
123 def checkHasHeader(self
, name
, value
):
124 got
= self
.getHeaderValue(name
)
125 self
.assertEquals(got
, value
)
127 def checkNoHeader(self
, name
):
128 self
.checkHasHeader(name
, None)
133 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
134 if 'SKIP_DOH_TESTS' in os
.environ
:
135 raise unittest
.SkipTest('DNS over HTTPS tests are disabled')
137 cls
.startResponders()
141 print("Launching tests..")
144 # def openDOHConnection(cls, port, caFile, timeout=2.0):
145 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
146 # sslctx.load_verify_locations(caFile)
147 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
150 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
151 # url = cls.getDOHGetURL(baseurl, query)
154 # cls._toResponderQueue.put(response, True, timeout)
156 # conn.request('GET', url)
159 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
161 # data = conn.get_response()
165 # message = dns.message.from_wire(data)
167 # if useQueue and not cls._fromResponderQueue.empty():
168 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
169 # return (receivedQuery, message)
173 class TestDOH(DNSDistDOHTest
):
175 _serverKey
= 'server.key'
176 _serverCert
= 'server.chain'
177 _serverName
= 'tls.tests.dnsdist.org'
179 _dohServerPort
= 8443
180 _customResponseHeader1
= 'access-control-allow-origin: *'
181 _customResponseHeader2
= 'user-agent: derp'
182 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
183 _config_template
= """
184 newServer{address="127.0.0.1:%s"}
186 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/", "/coffee", "/PowerDNS", "/PowerDNS2", "/PowerDNS-999" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
187 dohFE = getDOHFrontend(0)
188 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
190 addAction("drop.doh.tests.powerdns.com.", DropAction())
191 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
192 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
193 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
194 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
195 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
196 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
197 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
199 function dohHandler(dq)
200 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
201 local foundct = false
202 for key,value in pairs(dq:getHTTPHeaders()) do
203 if key == 'content-type' and value == 'application/dns-message' then
209 dq:setHTTPResponse(200, 'It works!', 'text/plain')
211 return DNSAction.HeaderModify
214 return DNSAction.None
216 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
218 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
220 def testDOHSimple(self
):
224 name
= 'simple.doh.tests.powerdns.com.'
225 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
227 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
229 response
= dns
.message
.make_response(query
)
230 rrset
= dns
.rrset
.from_text(name
,
235 response
.answer
.append(rrset
)
237 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
238 self
.assertTrue(receivedQuery
)
239 self
.assertTrue(receivedResponse
)
240 receivedQuery
.id = expectedQuery
.id
241 self
.assertEquals(expectedQuery
, receivedQuery
)
242 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
243 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
244 self
.assertFalse(('UPPERCASE: VaLuE' in self
._response
_headers
.decode()))
245 self
.assertTrue(('uppercase: VaLuE' in self
._response
_headers
.decode()))
246 self
.assertTrue(('cache-control: max-age=3600' in self
._response
_headers
.decode()))
247 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
248 self
.assertEquals(response
, receivedResponse
)
249 self
.checkHasHeader('cache-control', 'max-age=3600')
251 def testDOHSimplePOST(self
):
253 DOH: Simple POST query
255 name
= 'simple-post.doh.tests.powerdns.com.'
256 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
258 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
260 response
= dns
.message
.make_response(query
)
261 rrset
= dns
.rrset
.from_text(name
,
266 response
.answer
.append(rrset
)
268 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
269 self
.assertTrue(receivedQuery
)
270 self
.assertTrue(receivedResponse
)
271 receivedQuery
.id = expectedQuery
.id
272 self
.assertEquals(expectedQuery
, receivedQuery
)
273 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
274 self
.assertEquals(response
, receivedResponse
)
276 def testDOHExistingEDNS(self
):
280 name
= 'existing-edns.doh.tests.powerdns.com.'
281 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
283 response
= dns
.message
.make_response(query
)
284 rrset
= dns
.rrset
.from_text(name
,
289 response
.answer
.append(rrset
)
291 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
292 self
.assertTrue(receivedQuery
)
293 self
.assertTrue(receivedResponse
)
294 receivedQuery
.id = query
.id
295 self
.assertEquals(query
, receivedQuery
)
296 self
.assertEquals(response
, receivedResponse
)
297 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
298 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
300 def testDOHExistingECS(self
):
302 DOH: Existing EDNS Client Subnet
304 name
= 'existing-ecs.doh.tests.powerdns.com.'
305 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
306 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
307 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
309 response
= dns
.message
.make_response(query
)
310 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
311 rrset
= dns
.rrset
.from_text(name
,
316 response
.answer
.append(rrset
)
318 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
319 self
.assertTrue(receivedQuery
)
320 self
.assertTrue(receivedResponse
)
321 receivedQuery
.id = query
.id
322 self
.assertEquals(query
, receivedQuery
)
323 self
.assertEquals(response
, receivedResponse
)
324 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
325 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
327 def testDropped(self
):
331 name
= 'drop.doh.tests.powerdns.com.'
332 query
= dns
.message
.make_query(name
, 'A', 'IN')
333 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
334 self
.assertEquals(receivedResponse
, None)
336 def testRefused(self
):
340 name
= 'refused.doh.tests.powerdns.com.'
341 query
= dns
.message
.make_query(name
, 'A', 'IN')
343 query
.flags
&= ~dns
.flags
.RD
344 expectedResponse
= dns
.message
.make_response(query
)
345 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
347 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
348 self
.assertEquals(receivedResponse
, expectedResponse
)
354 name
= 'spoof.doh.tests.powerdns.com.'
355 query
= dns
.message
.make_query(name
, 'A', 'IN')
357 query
.flags
&= ~dns
.flags
.RD
358 expectedResponse
= dns
.message
.make_response(query
)
359 rrset
= dns
.rrset
.from_text(name
,
364 expectedResponse
.answer
.append(rrset
)
366 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
367 self
.assertEquals(receivedResponse
, expectedResponse
)
369 def testDOHInvalid(self
):
373 name
= 'invalid.doh.tests.powerdns.com.'
374 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
376 # first an invalid query
377 invalidQuery
= invalidQuery
.to_wire()
378 invalidQuery
= invalidQuery
[:-5]
379 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
380 self
.assertEquals(receivedResponse
, None)
382 # and now a valid one
383 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
385 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
387 response
= dns
.message
.make_response(query
)
388 rrset
= dns
.rrset
.from_text(name
,
393 response
.answer
.append(rrset
)
394 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
395 self
.assertTrue(receivedQuery
)
396 self
.assertTrue(receivedResponse
)
397 receivedQuery
.id = expectedQuery
.id
398 self
.assertEquals(expectedQuery
, receivedQuery
)
399 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
400 self
.assertEquals(response
, receivedResponse
)
402 def testDOHWithoutQuery(self
):
406 name
= 'empty-get.doh.tests.powerdns.com.'
407 url
= self
._dohBaseURL
408 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
409 conn
.setopt(pycurl
.URL
, url
)
410 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
411 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
412 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
413 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
414 data
= conn
.perform_rb()
415 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
416 self
.assertEquals(rcode
, 400)
418 def testDOHEmptyPOST(self
):
420 DOH: Empty POST query
422 name
= 'empty-post.doh.tests.powerdns.com.'
424 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
425 self
.assertEquals(receivedResponse
, None)
427 # and now a valid one
428 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
430 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
432 response
= dns
.message
.make_response(query
)
433 rrset
= dns
.rrset
.from_text(name
,
438 response
.answer
.append(rrset
)
439 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
440 self
.assertTrue(receivedQuery
)
441 self
.assertTrue(receivedResponse
)
442 receivedQuery
.id = expectedQuery
.id
443 self
.assertEquals(expectedQuery
, receivedQuery
)
444 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
445 self
.assertEquals(response
, receivedResponse
)
447 def testHeaderRule(self
):
451 name
= 'header-rule.doh.tests.powerdns.com.'
452 query
= dns
.message
.make_query(name
, 'A', 'IN')
454 query
.flags
&= ~dns
.flags
.RD
455 expectedResponse
= dns
.message
.make_response(query
)
456 rrset
= dns
.rrset
.from_text(name
,
461 expectedResponse
.answer
.append(rrset
)
463 # this header should match
464 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
465 self
.assertEquals(receivedResponse
, expectedResponse
)
467 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
468 expectedQuery
.flags
&= ~dns
.flags
.RD
470 response
= dns
.message
.make_response(query
)
471 rrset
= dns
.rrset
.from_text(name
,
476 response
.answer
.append(rrset
)
478 # this content of the header should NOT match
479 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
480 self
.assertTrue(receivedQuery
)
481 self
.assertTrue(receivedResponse
)
482 receivedQuery
.id = expectedQuery
.id
483 self
.assertEquals(expectedQuery
, receivedQuery
)
484 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
485 self
.assertEquals(response
, receivedResponse
)
487 def testHTTPPath(self
):
491 name
= 'http-path.doh.tests.powerdns.com.'
492 query
= dns
.message
.make_query(name
, 'A', 'IN')
494 query
.flags
&= ~dns
.flags
.RD
495 expectedResponse
= dns
.message
.make_response(query
)
496 rrset
= dns
.rrset
.from_text(name
,
501 expectedResponse
.answer
.append(rrset
)
503 # this path should match
504 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
505 self
.assertEquals(receivedResponse
, expectedResponse
)
507 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
509 expectedQuery
.flags
&= ~dns
.flags
.RD
510 response
= dns
.message
.make_response(query
)
511 rrset
= dns
.rrset
.from_text(name
,
516 response
.answer
.append(rrset
)
518 # this path should NOT match
519 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
520 self
.assertTrue(receivedQuery
)
521 self
.assertTrue(receivedResponse
)
522 receivedQuery
.id = expectedQuery
.id
523 self
.assertEquals(expectedQuery
, receivedQuery
)
524 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
525 self
.assertEquals(response
, receivedResponse
)
527 # this path is not in the URLs map and should lead to a 404
528 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS/something", query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
529 self
.assertTrue(receivedResponse
)
530 self
.assertEquals(receivedResponse
, b
'there is no endpoint configured for this path')
531 self
.assertEquals(self
._rcode
, 404)
533 def testHTTPPathRegex(self
):
537 name
= 'http-path-regex.doh.tests.powerdns.com.'
538 query
= dns
.message
.make_query(name
, 'A', 'IN')
540 query
.flags
&= ~dns
.flags
.RD
541 expectedResponse
= dns
.message
.make_response(query
)
542 rrset
= dns
.rrset
.from_text(name
,
547 expectedResponse
.answer
.append(rrset
)
549 # this path should match
550 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
551 self
.assertEquals(receivedResponse
, expectedResponse
)
553 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
555 expectedQuery
.flags
&= ~dns
.flags
.RD
556 response
= dns
.message
.make_response(query
)
557 rrset
= dns
.rrset
.from_text(name
,
562 response
.answer
.append(rrset
)
564 # this path should NOT match
565 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
566 self
.assertTrue(receivedQuery
)
567 self
.assertTrue(receivedResponse
)
568 receivedQuery
.id = expectedQuery
.id
569 self
.assertEquals(expectedQuery
, receivedQuery
)
570 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
571 self
.assertEquals(response
, receivedResponse
)
573 def testHTTPStatusAction200(self
):
575 DOH: HTTPStatusAction 200 OK
577 name
= 'http-status-action.doh.tests.powerdns.com.'
578 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
581 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
582 self
.assertTrue(receivedResponse
)
583 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
584 self
.assertEquals(self
._rcode
, 200)
585 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
587 def testHTTPStatusAction307(self
):
589 DOH: HTTPStatusAction 307
591 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
592 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
595 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
596 self
.assertTrue(receivedResponse
)
597 self
.assertEquals(self
._rcode
, 307)
598 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
600 def testHTTPLuaResponse(self
):
602 DOH: Lua HTTP Response
604 name
= 'http-lua.doh.tests.powerdns.com.'
605 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
608 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
609 self
.assertTrue(receivedResponse
)
610 self
.assertEquals(receivedResponse
, b
'It works!')
611 self
.assertEquals(self
._rcode
, 200)
612 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
614 def testHTTPEarlyResponse(self
):
616 DOH: HTTP Early Response
618 response_headers
= BytesIO()
619 url
= self
._dohBaseURL
+ 'coffee'
620 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
621 conn
.setopt(pycurl
.URL
, url
)
622 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
623 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
624 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
625 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
626 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
627 data
= conn
.perform_rb()
628 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
629 headers
= response_headers
.getvalue().decode()
631 self
.assertEquals(rcode
, 418)
632 self
.assertEquals(data
, b
'C0FFEE')
633 self
.assertIn('foo: bar', headers
)
634 self
.assertNotIn(self
._customResponseHeader
2, headers
)
636 response_headers
= BytesIO()
637 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
638 conn
.setopt(pycurl
.URL
, url
)
639 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
640 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
641 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
642 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
643 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
644 conn
.setopt(pycurl
.POST
, True)
646 conn
.setopt(pycurl
.POSTFIELDS
, data
)
648 data
= conn
.perform_rb()
649 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
650 headers
= response_headers
.getvalue().decode()
651 self
.assertEquals(rcode
, 418)
652 self
.assertEquals(data
, b
'C0FFEE')
653 self
.assertIn('foo: bar', headers
)
654 self
.assertNotIn(self
._customResponseHeader
2, headers
)
656 class TestDOHAddingECS(DNSDistDOHTest
):
658 _serverKey
= 'server.key'
659 _serverCert
= 'server.chain'
660 _serverName
= 'tls.tests.dnsdist.org'
662 _dohServerPort
= 8443
663 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
664 _config_template
= """
665 newServer{address="127.0.0.1:%s", useClientSubnet=true}
666 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
669 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
671 def testDOHSimple(self
):
673 DOH with ECS: Simple query
675 name
= 'simple.doh-ecs.tests.powerdns.com.'
676 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
678 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
679 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
680 response
= dns
.message
.make_response(query
)
681 rrset
= dns
.rrset
.from_text(name
,
686 response
.answer
.append(rrset
)
688 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
689 self
.assertTrue(receivedQuery
)
690 self
.assertTrue(receivedResponse
)
691 expectedQuery
.id = receivedQuery
.id
692 self
.assertEquals(expectedQuery
, receivedQuery
)
693 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
694 self
.assertEquals(response
, receivedResponse
)
695 self
.checkResponseNoEDNS(response
, receivedResponse
)
697 def testDOHExistingEDNS(self
):
699 DOH with ECS: Existing EDNS
701 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
702 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
704 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
705 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
706 response
= dns
.message
.make_response(query
)
707 rrset
= dns
.rrset
.from_text(name
,
712 response
.answer
.append(rrset
)
714 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
715 self
.assertTrue(receivedQuery
)
716 self
.assertTrue(receivedResponse
)
717 receivedQuery
.id = expectedQuery
.id
718 self
.assertEquals(expectedQuery
, receivedQuery
)
719 self
.assertEquals(response
, receivedResponse
)
720 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
721 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
723 def testDOHExistingECS(self
):
725 DOH with ECS: Existing EDNS Client Subnet
727 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
728 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
729 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
730 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
732 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
733 response
= dns
.message
.make_response(query
)
734 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
735 rrset
= dns
.rrset
.from_text(name
,
740 response
.answer
.append(rrset
)
742 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
743 self
.assertTrue(receivedQuery
)
744 self
.assertTrue(receivedResponse
)
745 receivedQuery
.id = expectedQuery
.id
746 self
.assertEquals(expectedQuery
, receivedQuery
)
747 self
.assertEquals(response
, receivedResponse
)
748 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
749 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
751 class TestDOHOverHTTP(DNSDistDOHTest
):
753 _dohServerPort
= 8480
754 _serverName
= 'tls.tests.dnsdist.org'
755 _dohBaseURL
= ("http://%s:%d/" % (_serverName
, _dohServerPort
))
756 _config_template
= """
757 newServer{address="127.0.0.1:%s"}
758 addDOHLocal("127.0.0.1:%s")
760 _config_params
= ['_testServerPort', '_dohServerPort']
761 _checkConfigExpectedOutput
= b
"""No certificate provided for DoH endpoint 127.0.0.1:8480, running in DNS over HTTP mode instead of DNS over HTTPS
762 Configuration 'configs/dnsdist_TestDOHOverHTTP.conf' OK!
765 def testDOHSimple(self
):
767 DOH over HTTP: Simple query
769 name
= 'simple.doh-over-http.tests.powerdns.com.'
770 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
772 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
773 response
= dns
.message
.make_response(query
)
774 rrset
= dns
.rrset
.from_text(name
,
779 response
.answer
.append(rrset
)
781 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
782 self
.assertTrue(receivedQuery
)
783 self
.assertTrue(receivedResponse
)
784 expectedQuery
.id = receivedQuery
.id
785 self
.assertEquals(expectedQuery
, receivedQuery
)
786 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
787 self
.assertEquals(response
, receivedResponse
)
788 self
.checkResponseNoEDNS(response
, receivedResponse
)
790 def testDOHSimplePOST(self
):
792 DOH over HTTP: Simple POST query
794 name
= 'simple-post.doh-over-http.tests.powerdns.com.'
795 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
797 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
799 response
= dns
.message
.make_response(query
)
800 rrset
= dns
.rrset
.from_text(name
,
805 response
.answer
.append(rrset
)
807 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
808 self
.assertTrue(receivedQuery
)
809 self
.assertTrue(receivedResponse
)
810 receivedQuery
.id = expectedQuery
.id
811 self
.assertEquals(expectedQuery
, receivedQuery
)
812 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
813 self
.assertEquals(response
, receivedResponse
)
814 self
.checkResponseNoEDNS(response
, receivedResponse
)
816 class TestDOHWithCache(DNSDistDOHTest
):
818 _serverKey
= 'server.key'
819 _serverCert
= 'server.chain'
820 _serverName
= 'tls.tests.dnsdist.org'
822 _dohServerPort
= 8443
823 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
824 _config_template
= """
825 newServer{address="127.0.0.1:%s"}
827 addDOHLocal("127.0.0.1:%s", "%s", "%s")
829 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
830 getPool(""):setCache(pc)
832 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
834 def testDOHCacheLargeAnswer(self
):
836 DOH with cache: Check that we can cache (and retrieve) large answers
839 name
= 'large.doh-with-cache.tests.powerdns.com.'
840 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
842 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
844 response
= dns
.message
.make_response(query
)
845 # we prepare a large answer
849 content
= content
+ ', '
850 content
= content
+ (str(i
)*50)
852 content
= content
+ 'A'*40
854 rrset
= dns
.rrset
.from_text(name
,
859 response
.answer
.append(rrset
)
860 self
.assertEquals(len(response
.to_wire()), 4096)
862 # first query to fill the cache
863 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
864 self
.assertTrue(receivedQuery
)
865 self
.assertTrue(receivedResponse
)
866 receivedQuery
.id = expectedQuery
.id
867 self
.assertEquals(expectedQuery
, receivedQuery
)
868 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
869 self
.assertEquals(response
, receivedResponse
)
870 self
.checkHasHeader('cache-control', 'max-age=3600')
872 for _
in range(numberOfQueries
):
873 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False)
874 self
.assertEquals(receivedResponse
, response
)
875 self
.checkHasHeader('cache-control', 'max-age=' + str(receivedResponse
.answer
[0].ttl
))
879 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False)
880 self
.assertEquals(receivedResponse
, response
)
881 self
.checkHasHeader('cache-control', 'max-age=' + str(receivedResponse
.answer
[0].ttl
))
883 class TestDOHWithoutCacheControl(DNSDistDOHTest
):
885 _serverKey
= 'server.key'
886 _serverCert
= 'server.chain'
887 _serverName
= 'tls.tests.dnsdist.org'
889 _dohServerPort
= 8443
890 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
891 _config_template
= """
892 newServer{address="127.0.0.1:%s"}
894 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {sendCacheControlHeaders=false})
896 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
898 def testDOHSimple(self
):
900 DOH without cache-control
902 name
= 'simple.doh.tests.powerdns.com.'
903 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
905 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
907 response
= dns
.message
.make_response(query
)
908 rrset
= dns
.rrset
.from_text(name
,
913 response
.answer
.append(rrset
)
915 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
916 self
.assertTrue(receivedQuery
)
917 self
.assertTrue(receivedResponse
)
918 receivedQuery
.id = expectedQuery
.id
919 self
.assertEquals(expectedQuery
, receivedQuery
)
920 self
.checkNoHeader('cache-control')
921 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
922 self
.assertEquals(response
, receivedResponse
)