]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 response_headers
= BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn
.setopt(pycurl
.URL
, url
)
39 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
41 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
42 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
44 conn
.setopt(pycurl
.CAINFO
, caFile
)
46 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
47 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
50 cls
._toResponderQueue
.put(response
, True, timeout
)
54 cls
._response
_headers
= ''
55 data
= conn
.perform_rb()
56 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
57 if cls
._rcode
== 200 and not rawResponse
:
58 message
= dns
.message
.from_wire(data
)
62 if useQueue
and not cls
._fromResponderQueue
.empty():
63 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
65 cls
._response
_headers
= response_headers
.getvalue()
66 return (receivedQuery
, message
)
69 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
71 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
72 response_headers
= BytesIO()
73 #conn.setopt(pycurl.VERBOSE, True)
74 conn
.setopt(pycurl
.URL
, url
)
75 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
77 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
78 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
80 conn
.setopt(pycurl
.CAINFO
, caFile
)
82 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
83 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
84 conn
.setopt(pycurl
.POST
, True)
89 conn
.setopt(pycurl
.POSTFIELDS
, data
)
92 cls
._toResponderQueue
.put(response
, True, timeout
)
96 cls
._response
_headers
= ''
97 data
= conn
.perform_rb()
98 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
99 if cls
._rcode
== 200 and not rawResponse
:
100 message
= dns
.message
.from_wire(data
)
104 if useQueue
and not cls
._fromResponderQueue
.empty():
105 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
107 cls
._response
_headers
= response_headers
.getvalue()
108 return (receivedQuery
, message
)
111 # def openDOHConnection(cls, port, caFile, timeout=2.0):
112 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
113 # sslctx.load_verify_locations(caFile)
114 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
117 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
118 # url = cls.getDOHGetURL(baseurl, query)
121 # cls._toResponderQueue.put(response, True, timeout)
123 # conn.request('GET', url)
126 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
128 # data = conn.get_response()
132 # message = dns.message.from_wire(data)
134 # if useQueue and not cls._fromResponderQueue.empty():
135 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
136 # return (receivedQuery, message)
140 class TestDOH(DNSDistDOHTest
):
142 _serverKey
= 'server.key'
143 _serverCert
= 'server.chain'
144 _serverName
= 'tls.tests.dnsdist.org'
146 _dohServerPort
= 8443
147 _customResponseHeader1
= 'access-control-allow-origin: *'
148 _customResponseHeader2
= 'user-agent: derp'
149 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
150 _config_template
= """
151 newServer{address="127.0.0.1:%s"}
153 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
154 dohFE = getDOHFrontend(0)
155 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['foo']='bar'})})
157 addAction("drop.doh.tests.powerdns.com.", DropAction())
158 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
159 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
160 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
161 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
162 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
163 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
164 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
166 function dohHandler(dq)
167 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
168 local foundct = false
169 for key,value in pairs(dq:getHTTPHeaders()) do
170 if key == 'content-type' and value == 'application/dns-message' then
176 dq:setHTTPResponse(200, 'It works!', 'text/plain')
178 return DNSAction.HeaderModify
181 return DNSAction.None
183 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
185 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
187 def testDOHSimple(self
):
191 name
= 'simple.doh.tests.powerdns.com.'
192 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
194 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
196 response
= dns
.message
.make_response(query
)
197 rrset
= dns
.rrset
.from_text(name
,
202 response
.answer
.append(rrset
)
204 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
205 self
.assertTrue(receivedQuery
)
206 self
.assertTrue(receivedResponse
)
207 receivedQuery
.id = expectedQuery
.id
208 self
.assertEquals(expectedQuery
, receivedQuery
)
209 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
210 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
211 self
.assertFalse(('UPPERCASE: VaLuE' in self
._response
_headers
.decode()))
212 self
.assertTrue(('uppercase: VaLuE' in self
._response
_headers
.decode()))
213 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
214 self
.assertEquals(response
, receivedResponse
)
216 def testDOHSimplePOST(self
):
218 DOH: Simple POST query
220 name
= 'simple-post.doh.tests.powerdns.com.'
221 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
223 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
225 response
= dns
.message
.make_response(query
)
226 rrset
= dns
.rrset
.from_text(name
,
231 response
.answer
.append(rrset
)
233 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
234 self
.assertTrue(receivedQuery
)
235 self
.assertTrue(receivedResponse
)
236 receivedQuery
.id = expectedQuery
.id
237 self
.assertEquals(expectedQuery
, receivedQuery
)
238 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
239 self
.assertEquals(response
, receivedResponse
)
241 def testDOHExistingEDNS(self
):
245 name
= 'existing-edns.doh.tests.powerdns.com.'
246 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
248 response
= dns
.message
.make_response(query
)
249 rrset
= dns
.rrset
.from_text(name
,
254 response
.answer
.append(rrset
)
256 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
257 self
.assertTrue(receivedQuery
)
258 self
.assertTrue(receivedResponse
)
259 receivedQuery
.id = query
.id
260 self
.assertEquals(query
, receivedQuery
)
261 self
.assertEquals(response
, receivedResponse
)
262 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
263 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
265 def testDOHExistingECS(self
):
267 DOH: Existing EDNS Client Subnet
269 name
= 'existing-ecs.doh.tests.powerdns.com.'
270 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
271 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
272 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
274 response
= dns
.message
.make_response(query
)
275 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
276 rrset
= dns
.rrset
.from_text(name
,
281 response
.answer
.append(rrset
)
283 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
284 self
.assertTrue(receivedQuery
)
285 self
.assertTrue(receivedResponse
)
286 receivedQuery
.id = query
.id
287 self
.assertEquals(query
, receivedQuery
)
288 self
.assertEquals(response
, receivedResponse
)
289 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
290 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
292 def testDropped(self
):
296 name
= 'drop.doh.tests.powerdns.com.'
297 query
= dns
.message
.make_query(name
, 'A', 'IN')
298 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
299 self
.assertEquals(receivedResponse
, None)
301 def testRefused(self
):
305 name
= 'refused.doh.tests.powerdns.com.'
306 query
= dns
.message
.make_query(name
, 'A', 'IN')
308 expectedResponse
= dns
.message
.make_response(query
)
309 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
311 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
312 self
.assertEquals(receivedResponse
, expectedResponse
)
318 name
= 'spoof.doh.tests.powerdns.com.'
319 query
= dns
.message
.make_query(name
, 'A', 'IN')
321 query
.flags
&= ~dns
.flags
.RD
322 expectedResponse
= dns
.message
.make_response(query
)
323 rrset
= dns
.rrset
.from_text(name
,
328 expectedResponse
.answer
.append(rrset
)
330 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
331 self
.assertEquals(receivedResponse
, expectedResponse
)
333 def testDOHInvalid(self
):
337 name
= 'invalid.doh.tests.powerdns.com.'
338 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
340 # first an invalid query
341 invalidQuery
= invalidQuery
.to_wire()
342 invalidQuery
= invalidQuery
[:-5]
343 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
344 self
.assertEquals(receivedResponse
, None)
346 # and now a valid one
347 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
349 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
351 response
= dns
.message
.make_response(query
)
352 rrset
= dns
.rrset
.from_text(name
,
357 response
.answer
.append(rrset
)
358 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
359 self
.assertTrue(receivedQuery
)
360 self
.assertTrue(receivedResponse
)
361 receivedQuery
.id = expectedQuery
.id
362 self
.assertEquals(expectedQuery
, receivedQuery
)
363 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
364 self
.assertEquals(response
, receivedResponse
)
366 def testDOHWithoutQuery(self
):
370 name
= 'empty-get.doh.tests.powerdns.com.'
371 url
= self
._dohBaseURL
372 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
373 conn
.setopt(pycurl
.URL
, url
)
374 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
375 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
376 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
377 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
378 data
= conn
.perform_rb()
379 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
380 self
.assertEquals(rcode
, 400)
382 def testDOHEmptyPOST(self
):
384 DOH: Empty POST query
386 name
= 'empty-post.doh.tests.powerdns.com.'
388 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
389 self
.assertEquals(receivedResponse
, None)
391 # and now a valid one
392 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
394 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
396 response
= dns
.message
.make_response(query
)
397 rrset
= dns
.rrset
.from_text(name
,
402 response
.answer
.append(rrset
)
403 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
404 self
.assertTrue(receivedQuery
)
405 self
.assertTrue(receivedResponse
)
406 receivedQuery
.id = expectedQuery
.id
407 self
.assertEquals(expectedQuery
, receivedQuery
)
408 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
409 self
.assertEquals(response
, receivedResponse
)
411 def testHeaderRule(self
):
415 name
= 'header-rule.doh.tests.powerdns.com.'
416 query
= dns
.message
.make_query(name
, 'A', 'IN')
418 query
.flags
&= ~dns
.flags
.RD
419 expectedResponse
= dns
.message
.make_response(query
)
420 rrset
= dns
.rrset
.from_text(name
,
425 expectedResponse
.answer
.append(rrset
)
427 # this header should match
428 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
429 self
.assertEquals(receivedResponse
, expectedResponse
)
431 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
432 expectedQuery
.flags
&= ~dns
.flags
.RD
434 response
= dns
.message
.make_response(query
)
435 rrset
= dns
.rrset
.from_text(name
,
440 response
.answer
.append(rrset
)
442 # this content of the header should NOT match
443 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
444 self
.assertTrue(receivedQuery
)
445 self
.assertTrue(receivedResponse
)
446 receivedQuery
.id = expectedQuery
.id
447 self
.assertEquals(expectedQuery
, receivedQuery
)
448 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
449 self
.assertEquals(response
, receivedResponse
)
451 def testHTTPPath(self
):
455 name
= 'http-path.doh.tests.powerdns.com.'
456 query
= dns
.message
.make_query(name
, 'A', 'IN')
458 query
.flags
&= ~dns
.flags
.RD
459 expectedResponse
= dns
.message
.make_response(query
)
460 rrset
= dns
.rrset
.from_text(name
,
465 expectedResponse
.answer
.append(rrset
)
467 # this path should match
468 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
469 self
.assertEquals(receivedResponse
, expectedResponse
)
471 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
473 expectedQuery
.flags
&= ~dns
.flags
.RD
474 response
= dns
.message
.make_response(query
)
475 rrset
= dns
.rrset
.from_text(name
,
480 response
.answer
.append(rrset
)
482 # this path should NOT match
483 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
484 self
.assertTrue(receivedQuery
)
485 self
.assertTrue(receivedResponse
)
486 receivedQuery
.id = expectedQuery
.id
487 self
.assertEquals(expectedQuery
, receivedQuery
)
488 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
489 self
.assertEquals(response
, receivedResponse
)
491 def testHTTPPathRegex(self
):
495 name
= 'http-path-regex.doh.tests.powerdns.com.'
496 query
= dns
.message
.make_query(name
, 'A', 'IN')
498 query
.flags
&= ~dns
.flags
.RD
499 expectedResponse
= dns
.message
.make_response(query
)
500 rrset
= dns
.rrset
.from_text(name
,
505 expectedResponse
.answer
.append(rrset
)
507 # this path should match
508 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
509 self
.assertEquals(receivedResponse
, expectedResponse
)
511 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
513 expectedQuery
.flags
&= ~dns
.flags
.RD
514 response
= dns
.message
.make_response(query
)
515 rrset
= dns
.rrset
.from_text(name
,
520 response
.answer
.append(rrset
)
522 # this path should NOT match
523 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
524 self
.assertTrue(receivedQuery
)
525 self
.assertTrue(receivedResponse
)
526 receivedQuery
.id = expectedQuery
.id
527 self
.assertEquals(expectedQuery
, receivedQuery
)
528 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
529 self
.assertEquals(response
, receivedResponse
)
531 def testHTTPStatusAction200(self
):
533 DOH: HTTPStatusAction 200 OK
535 name
= 'http-status-action.doh.tests.powerdns.com.'
536 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
539 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
540 self
.assertTrue(receivedResponse
)
541 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
542 self
.assertEquals(self
._rcode
, 200)
543 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
545 def testHTTPStatusAction307(self
):
547 DOH: HTTPStatusAction 307
549 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
550 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
553 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
554 self
.assertTrue(receivedResponse
)
555 self
.assertEquals(self
._rcode
, 307)
556 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
558 def testHTTPLuaResponse(self
):
560 DOH: Lua HTTP Response
562 name
= 'http-lua.doh.tests.powerdns.com.'
563 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
566 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
567 self
.assertTrue(receivedResponse
)
568 self
.assertEquals(receivedResponse
, b
'It works!')
569 self
.assertEquals(self
._rcode
, 200)
570 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
572 def testHTTPEarlyResponse(self
):
574 DOH: HTTP Early Response
576 response_headers
= BytesIO()
577 url
= self
._dohBaseURL
+ 'coffee'
578 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
579 conn
.setopt(pycurl
.URL
, url
)
580 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
581 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
582 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
583 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
584 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
585 data
= conn
.perform_rb()
586 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
587 headers
= response_headers
.getvalue().decode()
589 self
.assertEquals(rcode
, 418)
590 self
.assertEquals(data
, b
'C0FFEE')
591 self
.assertIn('foo: bar', headers
)
592 self
.assertNotIn(self
._customResponseHeader
2, headers
)
594 response_headers
= BytesIO()
595 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
596 conn
.setopt(pycurl
.URL
, url
)
597 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
598 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
599 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
600 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
601 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
602 conn
.setopt(pycurl
.POST
, True)
604 conn
.setopt(pycurl
.POSTFIELDS
, data
)
606 data
= conn
.perform_rb()
607 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
608 headers
= response_headers
.getvalue().decode()
609 self
.assertEquals(rcode
, 418)
610 self
.assertEquals(data
, b
'C0FFEE')
611 self
.assertIn('foo: bar', headers
)
612 self
.assertNotIn(self
._customResponseHeader
2, headers
)
614 class TestDOHAddingECS(DNSDistDOHTest
):
616 _serverKey
= 'server.key'
617 _serverCert
= 'server.chain'
618 _serverName
= 'tls.tests.dnsdist.org'
620 _dohServerPort
= 8443
621 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
622 _config_template
= """
623 newServer{address="127.0.0.1:%s", useClientSubnet=true}
624 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
627 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
629 def testDOHSimple(self
):
631 DOH with ECS: Simple query
633 name
= 'simple.doh-ecs.tests.powerdns.com.'
634 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
636 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
637 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
638 response
= dns
.message
.make_response(query
)
639 rrset
= dns
.rrset
.from_text(name
,
644 response
.answer
.append(rrset
)
646 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
647 self
.assertTrue(receivedQuery
)
648 self
.assertTrue(receivedResponse
)
649 expectedQuery
.id = receivedQuery
.id
650 self
.assertEquals(expectedQuery
, receivedQuery
)
651 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
652 self
.assertEquals(response
, receivedResponse
)
653 self
.checkResponseNoEDNS(response
, receivedResponse
)
655 def testDOHExistingEDNS(self
):
657 DOH with ECS: Existing EDNS
659 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
660 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
662 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
663 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
664 response
= dns
.message
.make_response(query
)
665 rrset
= dns
.rrset
.from_text(name
,
670 response
.answer
.append(rrset
)
672 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
673 self
.assertTrue(receivedQuery
)
674 self
.assertTrue(receivedResponse
)
675 receivedQuery
.id = expectedQuery
.id
676 self
.assertEquals(expectedQuery
, receivedQuery
)
677 self
.assertEquals(response
, receivedResponse
)
678 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
679 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
681 def testDOHExistingECS(self
):
683 DOH with ECS: Existing EDNS Client Subnet
685 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
686 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
687 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
688 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
690 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
691 response
= dns
.message
.make_response(query
)
692 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
693 rrset
= dns
.rrset
.from_text(name
,
698 response
.answer
.append(rrset
)
700 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
701 self
.assertTrue(receivedQuery
)
702 self
.assertTrue(receivedResponse
)
703 receivedQuery
.id = expectedQuery
.id
704 self
.assertEquals(expectedQuery
, receivedQuery
)
705 self
.assertEquals(response
, receivedResponse
)
706 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
707 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
709 class TestDOHOverHTTP(DNSDistDOHTest
):
711 _dohServerPort
= 8480
712 _serverName
= 'tls.tests.dnsdist.org'
713 _dohBaseURL
= ("http://%s:%d/" % (_serverName
, _dohServerPort
))
714 _config_template
= """
715 newServer{address="127.0.0.1:%s"}
716 addDOHLocal("127.0.0.1:%s")
718 _config_params
= ['_testServerPort', '_dohServerPort']
720 def testDOHSimple(self
):
722 DOH over HTTP: Simple query
724 name
= 'simple.doh-over-http.tests.powerdns.com.'
725 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
727 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
728 response
= dns
.message
.make_response(query
)
729 rrset
= dns
.rrset
.from_text(name
,
734 response
.answer
.append(rrset
)
736 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
737 self
.assertTrue(receivedQuery
)
738 self
.assertTrue(receivedResponse
)
739 expectedQuery
.id = receivedQuery
.id
740 self
.assertEquals(expectedQuery
, receivedQuery
)
741 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
742 self
.assertEquals(response
, receivedResponse
)
743 self
.checkResponseNoEDNS(response
, receivedResponse
)
745 def testDOHSimplePOST(self
):
747 DOH over HTTP: Simple POST query
749 name
= 'simple-post.doh-over-http.tests.powerdns.com.'
750 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
752 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
754 response
= dns
.message
.make_response(query
)
755 rrset
= dns
.rrset
.from_text(name
,
760 response
.answer
.append(rrset
)
762 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
763 self
.assertTrue(receivedQuery
)
764 self
.assertTrue(receivedResponse
)
765 receivedQuery
.id = expectedQuery
.id
766 self
.assertEquals(expectedQuery
, receivedQuery
)
767 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
768 self
.assertEquals(response
, receivedResponse
)
769 self
.checkResponseNoEDNS(response
, receivedResponse
)