]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
dnsdist: Add a regression test for an invalid DoH query
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False):
34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
36 #conn.setopt(pycurl.VERBOSE, True)
37 conn.setopt(pycurl.URL, url)
38 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
39 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
40 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
41 if caFile:
42 conn.setopt(pycurl.CAINFO, caFile)
43
44 if response:
45 cls._toResponderQueue.put(response, True, timeout)
46
47 receivedQuery = None
48 message = None
49 data = conn.perform_rb()
50 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
51 if rcode == 200:
52 message = dns.message.from_wire(data)
53
54 if useQueue and not cls._fromResponderQueue.empty():
55 receivedQuery = cls._fromResponderQueue.get(True, timeout)
56
57 return (receivedQuery, message)
58
59 # @classmethod
60 # def openDOHConnection(cls, port, caFile, timeout=2.0):
61 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
62 # sslctx.load_verify_locations(caFile)
63 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
64
65 # @classmethod
66 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
67 # url = cls.getDOHGetURL(baseurl, query)
68
69 # if response:
70 # cls._toResponderQueue.put(response, True, timeout)
71
72 # conn.request('GET', url)
73
74 # @classmethod
75 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
76 # message = None
77 # data = conn.get_response()
78 # if data:
79 # data = data.read()
80 # if data:
81 # message = dns.message.from_wire(data)
82
83 # if useQueue and not cls._fromResponderQueue.empty():
84 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
85 # return (receivedQuery, message)
86 # else:
87 # return message
88
89 class TestDOH(DNSDistDOHTest):
90
91 _serverKey = 'server.key'
92 _serverCert = 'server.chain'
93 _serverName = 'tls.tests.dnsdist.org'
94 _caCert = 'ca.pem'
95 _dohServerPort = 8443
96 _serverName = 'tls.tests.dnsdist.org'
97 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
98 _config_template = """
99 newServer{address="127.0.0.1:%s"}
100 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
101
102 addAction("drop.doh.tests.powerdns.com.", DropAction())
103 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
104 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
105 """
106 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
107
108 def testDOHSimple(self):
109 """
110 DOH: Simple query
111 """
112 name = 'simple.doh.tests.powerdns.com.'
113 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
114 query.id = 0
115 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
116 expectedQuery.id = 0
117 response = dns.message.make_response(query)
118 rrset = dns.rrset.from_text(name,
119 3600,
120 dns.rdataclass.IN,
121 dns.rdatatype.A,
122 '127.0.0.1')
123 response.answer.append(rrset)
124
125 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
126 self.assertTrue(receivedQuery)
127 self.assertTrue(receivedResponse)
128 receivedQuery.id = expectedQuery.id
129 self.assertEquals(expectedQuery, receivedQuery)
130 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
131 self.assertEquals(response, receivedResponse)
132
133 def testDOHExistingEDNS(self):
134 """
135 DOH: Existing EDNS
136 """
137 name = 'existing-edns.doh.tests.powerdns.com.'
138 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
139 query.id = 0
140 response = dns.message.make_response(query)
141 rrset = dns.rrset.from_text(name,
142 3600,
143 dns.rdataclass.IN,
144 dns.rdatatype.A,
145 '127.0.0.1')
146 response.answer.append(rrset)
147
148 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
149 self.assertTrue(receivedQuery)
150 self.assertTrue(receivedResponse)
151 receivedQuery.id = query.id
152 self.assertEquals(query, receivedQuery)
153 self.assertEquals(response, receivedResponse)
154 self.checkQueryEDNSWithoutECS(query, receivedQuery)
155 self.checkResponseEDNSWithoutECS(response, receivedResponse)
156
157 def testDOHExistingECS(self):
158 """
159 DOH: Existing EDNS Client Subnet
160 """
161 name = 'existing-ecs.doh.tests.powerdns.com.'
162 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
163 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
164 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
165 query.id = 0
166 response = dns.message.make_response(query)
167 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
168 rrset = dns.rrset.from_text(name,
169 3600,
170 dns.rdataclass.IN,
171 dns.rdatatype.A,
172 '127.0.0.1')
173 response.answer.append(rrset)
174
175 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
176 self.assertTrue(receivedQuery)
177 self.assertTrue(receivedResponse)
178 receivedQuery.id = query.id
179 self.assertEquals(query, receivedQuery)
180 self.assertEquals(response, receivedResponse)
181 self.checkQueryEDNSWithECS(query, receivedQuery)
182 self.checkResponseEDNSWithECS(response, receivedResponse)
183
184 def testDropped(self):
185 """
186 DOH: Dropped query
187 """
188 name = 'drop.doh.tests.powerdns.com.'
189 query = dns.message.make_query(name, 'A', 'IN')
190 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
191 print(receivedResponse)
192 self.assertEquals(receivedResponse, None)
193
194 def testRefused(self):
195 """
196 DOH: Refused
197 """
198 name = 'refused.doh.tests.powerdns.com.'
199 query = dns.message.make_query(name, 'A', 'IN')
200 query.id = 0
201 expectedResponse = dns.message.make_response(query)
202 expectedResponse.set_rcode(dns.rcode.REFUSED)
203
204 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
205 self.assertEquals(receivedResponse, expectedResponse)
206
207 def testSpoof(self):
208 """
209 DOH: Spoofed
210 """
211 name = 'spoof.doh.tests.powerdns.com.'
212 query = dns.message.make_query(name, 'A', 'IN')
213 query.id = 0
214 query.flags &= ~dns.flags.RD
215 expectedResponse = dns.message.make_response(query)
216 rrset = dns.rrset.from_text(name,
217 3600,
218 dns.rdataclass.IN,
219 dns.rdatatype.A,
220 '1.2.3.4')
221 expectedResponse.answer.append(rrset)
222
223 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
224 self.assertEquals(receivedResponse, expectedResponse)
225
226 def testDOHInvalid(self):
227 """
228 DOH: Invalid query
229 """
230 name = 'invalid.doh.tests.powerdns.com.'
231 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
232 invalidQuery.id = 0
233 # first an invalid query
234 invalidQuery = invalidQuery.to_wire()
235 invalidQuery = invalidQuery[:-5]
236 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
237 self.assertEquals(receivedResponse, None)
238
239 # and now a valid one
240 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
241 query.id = 0
242 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
243 expectedQuery.id = 0
244 response = dns.message.make_response(query)
245 rrset = dns.rrset.from_text(name,
246 3600,
247 dns.rdataclass.IN,
248 dns.rdatatype.A,
249 '127.0.0.1')
250 response.answer.append(rrset)
251 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
252 self.assertTrue(receivedQuery)
253 self.assertTrue(receivedResponse)
254 receivedQuery.id = expectedQuery.id
255 self.assertEquals(expectedQuery, receivedQuery)
256 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
257 self.assertEquals(response, receivedResponse)
258
259 class TestDOHAddingECS(DNSDistDOHTest):
260
261 _serverKey = 'server.key'
262 _serverCert = 'server.chain'
263 _serverName = 'tls.tests.dnsdist.org'
264 _caCert = 'ca.pem'
265 _dohServerPort = 8443
266 _serverName = 'tls.tests.dnsdist.org'
267 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
268 _config_template = """
269 newServer{address="127.0.0.1:%s", useClientSubnet=true}
270 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
271 setECSOverride(true)
272 """
273 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
274
275 def testDOHSimple(self):
276 """
277 DOH with ECS: Simple query
278 """
279 name = 'simple.doh-ecs.tests.powerdns.com.'
280 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
281 query.id = 0
282 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
283 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
284 response = dns.message.make_response(query)
285 rrset = dns.rrset.from_text(name,
286 3600,
287 dns.rdataclass.IN,
288 dns.rdatatype.A,
289 '127.0.0.1')
290 response.answer.append(rrset)
291
292 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
293 self.assertTrue(receivedQuery)
294 self.assertTrue(receivedResponse)
295 expectedQuery.id = receivedQuery.id
296 self.assertEquals(expectedQuery, receivedQuery)
297 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
298 self.assertEquals(response, receivedResponse)
299 self.checkResponseNoEDNS(response, receivedResponse)
300
301 def testDOHExistingEDNS(self):
302 """
303 DOH with ECS: Existing EDNS
304 """
305 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
306 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
307 query.id = 0
308 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
309 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
310 response = dns.message.make_response(query)
311 rrset = dns.rrset.from_text(name,
312 3600,
313 dns.rdataclass.IN,
314 dns.rdatatype.A,
315 '127.0.0.1')
316 response.answer.append(rrset)
317
318 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
319 self.assertTrue(receivedQuery)
320 self.assertTrue(receivedResponse)
321 receivedQuery.id = expectedQuery.id
322 self.assertEquals(expectedQuery, receivedQuery)
323 self.assertEquals(response, receivedResponse)
324 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
325 self.checkResponseEDNSWithoutECS(response, receivedResponse)
326
327 def testDOHExistingECS(self):
328 """
329 DOH with ECS: Existing EDNS Client Subnet
330 """
331 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
332 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
333 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
334 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
335 query.id = 0
336 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
337 response = dns.message.make_response(query)
338 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
339 rrset = dns.rrset.from_text(name,
340 3600,
341 dns.rdataclass.IN,
342 dns.rdatatype.A,
343 '127.0.0.1')
344 response.answer.append(rrset)
345
346 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
347 self.assertTrue(receivedQuery)
348 self.assertTrue(receivedResponse)
349 receivedQuery.id = expectedQuery.id
350 self.assertEquals(expectedQuery, receivedQuery)
351 self.assertEquals(response, receivedResponse)
352 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
353 self.checkResponseEDNSWithECS(response, receivedResponse)