]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #7723 from Habbie/auth-superslave
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query):
16 wire = query.to_wire()
17 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
18 return baseurl + "?dns=" + param
19
20 @classmethod
21 def openDOHConnection(cls, port, caFile, timeout=2.0):
22 conn = pycurl.Curl()
23 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
24
25 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
26 "Accept: application/dns-message"])
27 return conn
28
29 @classmethod
30 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True):
31 url = cls.getDOHGetURL(baseurl, query)
32 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
33 #conn.setopt(pycurl.VERBOSE, True)
34 conn.setopt(pycurl.URL, url)
35 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
36 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
37 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
38 if caFile:
39 conn.setopt(pycurl.CAINFO, caFile)
40
41 if response:
42 cls._toResponderQueue.put(response, True, timeout)
43
44 receivedQuery = None
45 message = None
46 data = conn.perform_rb()
47 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
48 if rcode == 200:
49 message = dns.message.from_wire(data)
50
51 if useQueue and not cls._fromResponderQueue.empty():
52 receivedQuery = cls._fromResponderQueue.get(True, timeout)
53
54 return (receivedQuery, message)
55
56 # @classmethod
57 # def openDOHConnection(cls, port, caFile, timeout=2.0):
58 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
59 # sslctx.load_verify_locations(caFile)
60 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
61
62 # @classmethod
63 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
64 # url = cls.getDOHGetURL(baseurl, query)
65
66 # if response:
67 # cls._toResponderQueue.put(response, True, timeout)
68
69 # conn.request('GET', url)
70
71 # @classmethod
72 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
73 # message = None
74 # data = conn.get_response()
75 # if data:
76 # data = data.read()
77 # if data:
78 # message = dns.message.from_wire(data)
79
80 # if useQueue and not cls._fromResponderQueue.empty():
81 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
82 # return (receivedQuery, message)
83 # else:
84 # return message
85
86 class TestDOH(DNSDistDOHTest):
87
88 _serverKey = 'server.key'
89 _serverCert = 'server.chain'
90 _serverName = 'tls.tests.dnsdist.org'
91 _caCert = 'ca.pem'
92 _dohServerPort = 8443
93 _serverName = 'tls.tests.dnsdist.org'
94 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
95 _config_template = """
96 newServer{address="127.0.0.1:%s"}
97 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
98
99 addAction("drop.doh.tests.powerdns.com.", DropAction())
100 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
101 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
102 """
103 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
104
105 def testDOHSimple(self):
106 """
107 DOH: Simple query
108 """
109 name = 'simple.doh.tests.powerdns.com.'
110 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
111 query.id = 0
112 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
113 expectedQuery.id = 0
114 response = dns.message.make_response(query)
115 rrset = dns.rrset.from_text(name,
116 3600,
117 dns.rdataclass.IN,
118 dns.rdatatype.A,
119 '127.0.0.1')
120 response.answer.append(rrset)
121
122 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
123 self.assertTrue(receivedQuery)
124 self.assertTrue(receivedResponse)
125 receivedQuery.id = expectedQuery.id
126 self.assertEquals(expectedQuery, receivedQuery)
127 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
128 self.assertEquals(response, receivedResponse)
129
130 def testDOHExistingEDNS(self):
131 """
132 DOH: Existing EDNS
133 """
134 name = 'existing-edns.doh.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
136 query.id = 0
137 response = dns.message.make_response(query)
138 rrset = dns.rrset.from_text(name,
139 3600,
140 dns.rdataclass.IN,
141 dns.rdatatype.A,
142 '127.0.0.1')
143 response.answer.append(rrset)
144
145 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
146 self.assertTrue(receivedQuery)
147 self.assertTrue(receivedResponse)
148 receivedQuery.id = query.id
149 self.assertEquals(query, receivedQuery)
150 self.assertEquals(response, receivedResponse)
151 self.checkQueryEDNSWithoutECS(query, receivedQuery)
152 self.checkResponseEDNSWithoutECS(response, receivedResponse)
153
154 def testDOHExistingECS(self):
155 """
156 DOH: Existing EDNS Client Subnet
157 """
158 name = 'existing-ecs.doh.tests.powerdns.com.'
159 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
160 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
161 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
162 query.id = 0
163 response = dns.message.make_response(query)
164 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
165 rrset = dns.rrset.from_text(name,
166 3600,
167 dns.rdataclass.IN,
168 dns.rdatatype.A,
169 '127.0.0.1')
170 response.answer.append(rrset)
171
172 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
173 self.assertTrue(receivedQuery)
174 self.assertTrue(receivedResponse)
175 receivedQuery.id = query.id
176 self.assertEquals(query, receivedQuery)
177 self.assertEquals(response, receivedResponse)
178 self.checkQueryEDNSWithECS(query, receivedQuery)
179 self.checkResponseEDNSWithECS(response, receivedResponse)
180
181 def testDropped(self):
182 """
183 DOH: Dropped query
184 """
185 name = 'drop.doh.tests.powerdns.com.'
186 query = dns.message.make_query(name, 'A', 'IN')
187 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
188 print(receivedResponse)
189 self.assertEquals(receivedResponse, None)
190
191 def testRefused(self):
192 """
193 DOH: Refused
194 """
195 name = 'refused.doh.tests.powerdns.com.'
196 query = dns.message.make_query(name, 'A', 'IN')
197 query.id = 0
198 expectedResponse = dns.message.make_response(query)
199 expectedResponse.set_rcode(dns.rcode.REFUSED)
200
201 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
202 self.assertEquals(receivedResponse, expectedResponse)
203
204 def testSpoof(self):
205 """
206 DOH: Spoofed
207 """
208 name = 'spoof.doh.tests.powerdns.com.'
209 query = dns.message.make_query(name, 'A', 'IN')
210 query.id = 0
211 query.flags &= ~dns.flags.RD
212 expectedResponse = dns.message.make_response(query)
213 rrset = dns.rrset.from_text(name,
214 3600,
215 dns.rdataclass.IN,
216 dns.rdatatype.A,
217 '1.2.3.4')
218 expectedResponse.answer.append(rrset)
219
220 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
221 self.assertEquals(receivedResponse, expectedResponse)
222
223
224 class TestDOHAddingECS(DNSDistDOHTest):
225
226 _serverKey = 'server.key'
227 _serverCert = 'server.chain'
228 _serverName = 'tls.tests.dnsdist.org'
229 _caCert = 'ca.pem'
230 _dohServerPort = 8443
231 _serverName = 'tls.tests.dnsdist.org'
232 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
233 _config_template = """
234 newServer{address="127.0.0.1:%s", useClientSubnet=true}
235 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
236 setECSOverride(true)
237 """
238 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
239
240 def testDOHSimple(self):
241 """
242 DOH with ECS: Simple query
243 """
244 name = 'simple.doh-ecs.tests.powerdns.com.'
245 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
246 query.id = 0
247 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
248 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
249 response = dns.message.make_response(query)
250 rrset = dns.rrset.from_text(name,
251 3600,
252 dns.rdataclass.IN,
253 dns.rdatatype.A,
254 '127.0.0.1')
255 response.answer.append(rrset)
256
257 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
258 self.assertTrue(receivedQuery)
259 self.assertTrue(receivedResponse)
260 expectedQuery.id = receivedQuery.id
261 self.assertEquals(expectedQuery, receivedQuery)
262 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
263 self.assertEquals(response, receivedResponse)
264 self.checkResponseNoEDNS(response, receivedResponse)
265
266 def testDOHExistingEDNS(self):
267 """
268 DOH with ECS: Existing EDNS
269 """
270 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
271 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
272 query.id = 0
273 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
274 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
275 response = dns.message.make_response(query)
276 rrset = dns.rrset.from_text(name,
277 3600,
278 dns.rdataclass.IN,
279 dns.rdatatype.A,
280 '127.0.0.1')
281 response.answer.append(rrset)
282
283 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
284 self.assertTrue(receivedQuery)
285 self.assertTrue(receivedResponse)
286 receivedQuery.id = expectedQuery.id
287 self.assertEquals(expectedQuery, receivedQuery)
288 self.assertEquals(response, receivedResponse)
289 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
290 self.checkResponseEDNSWithoutECS(response, receivedResponse)
291
292 def testDOHExistingECS(self):
293 """
294 DOH with ECS: Existing EDNS Client Subnet
295 """
296 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
297 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
298 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
299 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
300 query.id = 0
301 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
302 response = dns.message.make_response(query)
303 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
304 rrset = dns.rrset.from_text(name,
305 3600,
306 dns.rdataclass.IN,
307 dns.rdatatype.A,
308 '127.0.0.1')
309 response.answer.append(rrset)
310
311 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
312 self.assertTrue(receivedQuery)
313 self.assertTrue(receivedResponse)
314 receivedQuery.id = expectedQuery.id
315 self.assertEquals(expectedQuery, receivedQuery)
316 self.assertEquals(response, receivedResponse)
317 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
318 self.checkResponseEDNSWithECS(response, receivedResponse)