]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/dnsdistdohtests.py
7 from dnsdisttests
import DNSDistTest
10 from io
import BytesIO
12 @unittest.skipIf('SKIP_DOH_TESTS' in os
.environ
, 'DNS over HTTPS tests are disabled')
13 class DNSDistDOHTest(DNSDistTest
):
16 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
20 wire
= query
.to_wire()
21 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
22 return baseurl
+ "?dns=" + param
25 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
27 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
29 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
30 "Accept: application/dns-message"])
34 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True, fromQueue
=None, toQueue
=None):
35 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
36 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
37 response_headers
= BytesIO()
38 #conn.setopt(pycurl.VERBOSE, True)
39 conn
.setopt(pycurl
.URL
, url
)
40 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
42 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
43 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
45 conn
.setopt(pycurl
.CAINFO
, caFile
)
47 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
48 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
52 toQueue
.put(response
, True, timeout
)
54 cls
._toResponderQueue
.put(response
, True, timeout
)
58 cls
._response
_headers
= ''
59 data
= conn
.perform_rb()
60 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
61 if cls
._rcode
== 200 and not rawResponse
:
62 message
= dns
.message
.from_wire(data
)
68 if not fromQueue
.empty():
69 receivedQuery
= fromQueue
.get(True, timeout
)
71 if not cls
._fromResponderQueue
.empty():
72 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
74 cls
._response
_headers
= response_headers
.getvalue()
75 return (receivedQuery
, message
)
78 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
80 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
81 response_headers
= BytesIO()
82 #conn.setopt(pycurl.VERBOSE, True)
83 conn
.setopt(pycurl
.URL
, url
)
84 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
86 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
87 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
89 conn
.setopt(pycurl
.CAINFO
, caFile
)
91 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
92 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
93 conn
.setopt(pycurl
.POST
, True)
98 conn
.setopt(pycurl
.POSTFIELDS
, data
)
101 cls
._toResponderQueue
.put(response
, True, timeout
)
105 cls
._response
_headers
= ''
106 data
= conn
.perform_rb()
107 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
108 if cls
._rcode
== 200 and not rawResponse
:
109 message
= dns
.message
.from_wire(data
)
113 if useQueue
and not cls
._fromResponderQueue
.empty():
114 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
116 cls
._response
_headers
= response_headers
.getvalue()
117 return (receivedQuery
, message
)
119 def getHeaderValue(self
, name
):
120 for header
in self
._response
_headers
.decode().splitlines(False):
121 values
= header
.split(':')
123 if key
.lower() == name
.lower():
124 return values
[1].strip()
127 def checkHasHeader(self
, name
, value
):
128 got
= self
.getHeaderValue(name
)
129 self
.assertEqual(got
, value
)
131 def checkNoHeader(self
, name
):
132 self
.checkHasHeader(name
, None)
137 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
138 if 'SKIP_DOH_TESTS' in os
.environ
:
139 raise unittest
.SkipTest('DNS over HTTPS tests are disabled')
141 cls
.startResponders()
145 print("Launching tests..")