]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[]):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 response_headers
= BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn
.setopt(pycurl
.URL
, url
)
39 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
40 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
41 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
42 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
43 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
45 conn
.setopt(pycurl
.CAINFO
, caFile
)
48 cls
._toResponderQueue
.put(response
, True, timeout
)
52 cls
._response
_headers
= ''
53 data
= conn
.perform_rb()
54 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
55 if cls
._rcode
== 200 and not rawResponse
:
56 message
= dns
.message
.from_wire(data
)
60 if useQueue
and not cls
._fromResponderQueue
.empty():
61 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
63 cls
._response
_headers
= response_headers
.getvalue()
64 return (receivedQuery
, message
)
67 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[]):
69 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
70 response_headers
= BytesIO()
71 #conn.setopt(pycurl.VERBOSE, True)
72 conn
.setopt(pycurl
.URL
, url
)
73 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
74 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
75 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
76 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
77 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
78 conn
.setopt(pycurl
.POST
, True)
83 conn
.setopt(pycurl
.POSTFIELDS
, data
)
86 conn
.setopt(pycurl
.CAINFO
, caFile
)
89 cls
._toResponderQueue
.put(response
, True, timeout
)
93 cls
._response
_headers
= ''
94 data
= conn
.perform_rb()
95 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
96 if cls
._rcode
== 200 and not rawResponse
:
97 message
= dns
.message
.from_wire(data
)
101 if useQueue
and not cls
._fromResponderQueue
.empty():
102 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
104 cls
._response
_headers
= response_headers
.getvalue()
105 return (receivedQuery
, message
)
108 # def openDOHConnection(cls, port, caFile, timeout=2.0):
109 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
110 # sslctx.load_verify_locations(caFile)
111 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
114 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
115 # url = cls.getDOHGetURL(baseurl, query)
118 # cls._toResponderQueue.put(response, True, timeout)
120 # conn.request('GET', url)
123 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
125 # data = conn.get_response()
129 # message = dns.message.from_wire(data)
131 # if useQueue and not cls._fromResponderQueue.empty():
132 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
133 # return (receivedQuery, message)
137 class TestDOH(DNSDistDOHTest
):
139 _serverKey
= 'server.key'
140 _serverCert
= 'server.chain'
141 _serverName
= 'tls.tests.dnsdist.org'
143 _dohServerPort
= 8443
144 _customResponseHeader1
= 'access-control-allow-origin: *'
145 _customResponseHeader2
= 'user-agent: derp'
146 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
147 _config_template
= """
148 newServer{address="127.0.0.1:%s"}
150 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
152 addAction("drop.doh.tests.powerdns.com.", DropAction())
153 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
154 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
155 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
156 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
157 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
158 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
159 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
161 function dohHandler(dq)
162 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
163 local foundct = false
164 for key,value in pairs(dq:getHTTPHeaders()) do
165 if key == 'content-type' and value == 'application/dns-message' then
171 dq:setHTTPResponse(200, 'It works!', 'text/plain')
173 return DNSAction.HeaderModify
176 return DNSAction.None
178 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
180 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
182 def testDOHSimple(self
):
186 name
= 'simple.doh.tests.powerdns.com.'
187 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
189 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
191 response
= dns
.message
.make_response(query
)
192 rrset
= dns
.rrset
.from_text(name
,
197 response
.answer
.append(rrset
)
199 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
200 self
.assertTrue(receivedQuery
)
201 self
.assertTrue(receivedResponse
)
202 receivedQuery
.id = expectedQuery
.id
203 self
.assertEquals(expectedQuery
, receivedQuery
)
204 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
205 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
206 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
207 self
.assertEquals(response
, receivedResponse
)
209 def testDOHSimplePOST(self
):
211 DOH: Simple POST query
213 name
= 'simple-post.doh.tests.powerdns.com.'
214 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
216 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
218 response
= dns
.message
.make_response(query
)
219 rrset
= dns
.rrset
.from_text(name
,
224 response
.answer
.append(rrset
)
226 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
227 self
.assertTrue(receivedQuery
)
228 self
.assertTrue(receivedResponse
)
229 receivedQuery
.id = expectedQuery
.id
230 self
.assertEquals(expectedQuery
, receivedQuery
)
231 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
232 self
.assertEquals(response
, receivedResponse
)
234 def testDOHExistingEDNS(self
):
238 name
= 'existing-edns.doh.tests.powerdns.com.'
239 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
241 response
= dns
.message
.make_response(query
)
242 rrset
= dns
.rrset
.from_text(name
,
247 response
.answer
.append(rrset
)
249 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
250 self
.assertTrue(receivedQuery
)
251 self
.assertTrue(receivedResponse
)
252 receivedQuery
.id = query
.id
253 self
.assertEquals(query
, receivedQuery
)
254 self
.assertEquals(response
, receivedResponse
)
255 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
256 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
258 def testDOHExistingECS(self
):
260 DOH: Existing EDNS Client Subnet
262 name
= 'existing-ecs.doh.tests.powerdns.com.'
263 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
264 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
265 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
267 response
= dns
.message
.make_response(query
)
268 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
269 rrset
= dns
.rrset
.from_text(name
,
274 response
.answer
.append(rrset
)
276 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = query
.id
280 self
.assertEquals(query
, receivedQuery
)
281 self
.assertEquals(response
, receivedResponse
)
282 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
283 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
285 def testDropped(self
):
289 name
= 'drop.doh.tests.powerdns.com.'
290 query
= dns
.message
.make_query(name
, 'A', 'IN')
291 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
292 self
.assertEquals(receivedResponse
, None)
294 def testRefused(self
):
298 name
= 'refused.doh.tests.powerdns.com.'
299 query
= dns
.message
.make_query(name
, 'A', 'IN')
301 expectedResponse
= dns
.message
.make_response(query
)
302 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
304 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
305 self
.assertEquals(receivedResponse
, expectedResponse
)
311 name
= 'spoof.doh.tests.powerdns.com.'
312 query
= dns
.message
.make_query(name
, 'A', 'IN')
314 query
.flags
&= ~dns
.flags
.RD
315 expectedResponse
= dns
.message
.make_response(query
)
316 rrset
= dns
.rrset
.from_text(name
,
321 expectedResponse
.answer
.append(rrset
)
323 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
324 self
.assertEquals(receivedResponse
, expectedResponse
)
326 def testDOHInvalid(self
):
330 name
= 'invalid.doh.tests.powerdns.com.'
331 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
333 # first an invalid query
334 invalidQuery
= invalidQuery
.to_wire()
335 invalidQuery
= invalidQuery
[:-5]
336 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
337 self
.assertEquals(receivedResponse
, None)
339 # and now a valid one
340 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
342 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
344 response
= dns
.message
.make_response(query
)
345 rrset
= dns
.rrset
.from_text(name
,
350 response
.answer
.append(rrset
)
351 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
352 self
.assertTrue(receivedQuery
)
353 self
.assertTrue(receivedResponse
)
354 receivedQuery
.id = expectedQuery
.id
355 self
.assertEquals(expectedQuery
, receivedQuery
)
356 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
357 self
.assertEquals(response
, receivedResponse
)
359 def testDOHWithoutQuery(self
):
363 name
= 'empty-get.doh.tests.powerdns.com.'
364 url
= self
._dohBaseURL
365 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
366 conn
.setopt(pycurl
.URL
, url
)
367 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
368 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
369 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
370 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
371 data
= conn
.perform_rb()
372 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
373 self
.assertEquals(rcode
, 400)
375 def testDOHEmptyPOST(self
):
377 DOH: Empty POST query
379 name
= 'empty-post.doh.tests.powerdns.com.'
381 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
382 self
.assertEquals(receivedResponse
, None)
384 # and now a valid one
385 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
387 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
389 response
= dns
.message
.make_response(query
)
390 rrset
= dns
.rrset
.from_text(name
,
395 response
.answer
.append(rrset
)
396 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
397 self
.assertTrue(receivedQuery
)
398 self
.assertTrue(receivedResponse
)
399 receivedQuery
.id = expectedQuery
.id
400 self
.assertEquals(expectedQuery
, receivedQuery
)
401 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
402 self
.assertEquals(response
, receivedResponse
)
404 def testHeaderRule(self
):
408 name
= 'header-rule.doh.tests.powerdns.com.'
409 query
= dns
.message
.make_query(name
, 'A', 'IN')
411 query
.flags
&= ~dns
.flags
.RD
412 expectedResponse
= dns
.message
.make_response(query
)
413 rrset
= dns
.rrset
.from_text(name
,
418 expectedResponse
.answer
.append(rrset
)
420 # this header should match
421 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
422 self
.assertEquals(receivedResponse
, expectedResponse
)
424 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
425 expectedQuery
.flags
&= ~dns
.flags
.RD
427 response
= dns
.message
.make_response(query
)
428 rrset
= dns
.rrset
.from_text(name
,
433 response
.answer
.append(rrset
)
435 # this content of the header should NOT match
436 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
437 self
.assertTrue(receivedQuery
)
438 self
.assertTrue(receivedResponse
)
439 receivedQuery
.id = expectedQuery
.id
440 self
.assertEquals(expectedQuery
, receivedQuery
)
441 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
442 self
.assertEquals(response
, receivedResponse
)
444 def testHTTPPath(self
):
448 name
= 'http-path.doh.tests.powerdns.com.'
449 query
= dns
.message
.make_query(name
, 'A', 'IN')
451 query
.flags
&= ~dns
.flags
.RD
452 expectedResponse
= dns
.message
.make_response(query
)
453 rrset
= dns
.rrset
.from_text(name
,
458 expectedResponse
.answer
.append(rrset
)
460 # this path should match
461 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
462 self
.assertEquals(receivedResponse
, expectedResponse
)
464 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
466 expectedQuery
.flags
&= ~dns
.flags
.RD
467 response
= dns
.message
.make_response(query
)
468 rrset
= dns
.rrset
.from_text(name
,
473 response
.answer
.append(rrset
)
475 # this path should NOT match
476 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
477 self
.assertTrue(receivedQuery
)
478 self
.assertTrue(receivedResponse
)
479 receivedQuery
.id = expectedQuery
.id
480 self
.assertEquals(expectedQuery
, receivedQuery
)
481 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
482 self
.assertEquals(response
, receivedResponse
)
484 def testHTTPPathRegex(self
):
488 name
= 'http-path-regex.doh.tests.powerdns.com.'
489 query
= dns
.message
.make_query(name
, 'A', 'IN')
491 query
.flags
&= ~dns
.flags
.RD
492 expectedResponse
= dns
.message
.make_response(query
)
493 rrset
= dns
.rrset
.from_text(name
,
498 expectedResponse
.answer
.append(rrset
)
500 # this path should match
501 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
502 self
.assertEquals(receivedResponse
, expectedResponse
)
504 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
506 expectedQuery
.flags
&= ~dns
.flags
.RD
507 response
= dns
.message
.make_response(query
)
508 rrset
= dns
.rrset
.from_text(name
,
513 response
.answer
.append(rrset
)
515 # this path should NOT match
516 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
517 self
.assertTrue(receivedQuery
)
518 self
.assertTrue(receivedResponse
)
519 receivedQuery
.id = expectedQuery
.id
520 self
.assertEquals(expectedQuery
, receivedQuery
)
521 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
522 self
.assertEquals(response
, receivedResponse
)
524 def testHTTPStatusAction200(self
):
526 DOH: HTTPStatusAction 200 OK
528 name
= 'http-status-action.doh.tests.powerdns.com.'
529 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
532 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
533 self
.assertTrue(receivedResponse
)
534 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
535 self
.assertEquals(self
._rcode
, 200)
536 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
538 def testHTTPStatusAction307(self
):
540 DOH: HTTPStatusAction 307
542 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
543 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
546 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
547 self
.assertTrue(receivedResponse
)
548 self
.assertEquals(self
._rcode
, 307)
549 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
551 def testHTTPLuaResponse(self
):
553 DOH: Lua HTTP Response
555 name
= 'http-lua.doh.tests.powerdns.com.'
556 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
559 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
560 self
.assertTrue(receivedResponse
)
561 self
.assertEquals(receivedResponse
, b
'It works!')
562 self
.assertEquals(self
._rcode
, 200)
563 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
565 class TestDOHAddingECS(DNSDistDOHTest
):
567 _serverKey
= 'server.key'
568 _serverCert
= 'server.chain'
569 _serverName
= 'tls.tests.dnsdist.org'
571 _dohServerPort
= 8443
572 _serverName
= 'tls.tests.dnsdist.org'
573 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
574 _config_template
= """
575 newServer{address="127.0.0.1:%s", useClientSubnet=true}
576 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
579 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
581 def testDOHSimple(self
):
583 DOH with ECS: Simple query
585 name
= 'simple.doh-ecs.tests.powerdns.com.'
586 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
588 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
589 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
590 response
= dns
.message
.make_response(query
)
591 rrset
= dns
.rrset
.from_text(name
,
596 response
.answer
.append(rrset
)
598 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
599 self
.assertTrue(receivedQuery
)
600 self
.assertTrue(receivedResponse
)
601 expectedQuery
.id = receivedQuery
.id
602 self
.assertEquals(expectedQuery
, receivedQuery
)
603 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
604 self
.assertEquals(response
, receivedResponse
)
605 self
.checkResponseNoEDNS(response
, receivedResponse
)
607 def testDOHExistingEDNS(self
):
609 DOH with ECS: Existing EDNS
611 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
612 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
614 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
615 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
616 response
= dns
.message
.make_response(query
)
617 rrset
= dns
.rrset
.from_text(name
,
622 response
.answer
.append(rrset
)
624 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
625 self
.assertTrue(receivedQuery
)
626 self
.assertTrue(receivedResponse
)
627 receivedQuery
.id = expectedQuery
.id
628 self
.assertEquals(expectedQuery
, receivedQuery
)
629 self
.assertEquals(response
, receivedResponse
)
630 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
631 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
633 def testDOHExistingECS(self
):
635 DOH with ECS: Existing EDNS Client Subnet
637 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
638 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
639 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
640 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
642 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
643 response
= dns
.message
.make_response(query
)
644 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
645 rrset
= dns
.rrset
.from_text(name
,
650 response
.answer
.append(rrset
)
652 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
653 self
.assertTrue(receivedQuery
)
654 self
.assertTrue(receivedResponse
)
655 receivedQuery
.id = expectedQuery
.id
656 self
.assertEquals(expectedQuery
, receivedQuery
)
657 self
.assertEquals(response
, receivedResponse
)
658 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
659 self
.checkResponseEDNSWithECS(response
, receivedResponse
)