]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DOH.py
dnsdist: Fix headers handling in the DoH regression tests
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
CommitLineData
10535d88
RG
1#!/usr/bin/env python
2import base64
3import dns
4import clientsubnetoption
5from dnsdisttests import DNSDistTest
6
7import pycurl
811872fb 8from io import BytesIO
10535d88
RG
9#from hyper import HTTP20Connection
10#from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
47225117
RG
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
10535d88
RG
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
9ba32868 33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, customHeaders=[]):
47225117 34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
10535d88 35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
811872fb 36 response_headers = BytesIO()
10535d88
RG
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn.setopt(pycurl.URL, url)
39 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
40 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
41 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
9ba32868 42 conn.setopt(pycurl.HTTPHEADER, customHeaders)
ee01507f 43 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
10535d88
RG
44 if caFile:
45 conn.setopt(pycurl.CAINFO, caFile)
46
47 if response:
48 cls._toResponderQueue.put(response, True, timeout)
49
50 receivedQuery = None
51 message = None
ee01507f 52 cls._response_headers = ''
10535d88
RG
53 data = conn.perform_rb()
54 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
55 if rcode == 200:
56 message = dns.message.from_wire(data)
57
58 if useQueue and not cls._fromResponderQueue.empty():
59 receivedQuery = cls._fromResponderQueue.get(True, timeout)
60
ee01507f 61 cls._response_headers = response_headers.getvalue()
10535d88
RG
62 return (receivedQuery, message)
63
b1e527ad
RG
64 @classmethod
65 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False):
66 url = baseurl
67 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
68 #conn.setopt(pycurl.VERBOSE, True)
69 conn.setopt(pycurl.URL, url)
70 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
71 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
72 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
73 conn.setopt(pycurl.POST, True)
74 data = query
75 if not rawQuery:
76 data = data.to_wire()
77
78 conn.setopt(pycurl.POSTFIELDS, data)
79
80 if caFile:
81 conn.setopt(pycurl.CAINFO, caFile)
82
83 if response:
84 cls._toResponderQueue.put(response, True, timeout)
85
86 receivedQuery = None
87 message = None
88 data = conn.perform_rb()
89 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
90 if rcode == 200:
91 message = dns.message.from_wire(data)
92
93 if useQueue and not cls._fromResponderQueue.empty():
94 receivedQuery = cls._fromResponderQueue.get(True, timeout)
95
96 return (receivedQuery, message)
97
10535d88
RG
98# @classmethod
99# def openDOHConnection(cls, port, caFile, timeout=2.0):
100# sslctx = SSLContext(PROTOCOL_TLSv1_2)
101# sslctx.load_verify_locations(caFile)
102# return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
103
104# @classmethod
105# def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
106# url = cls.getDOHGetURL(baseurl, query)
107
108# if response:
109# cls._toResponderQueue.put(response, True, timeout)
110
111# conn.request('GET', url)
112
113# @classmethod
114# def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
115# message = None
116# data = conn.get_response()
117# if data:
118# data = data.read()
119# if data:
120# message = dns.message.from_wire(data)
121
122# if useQueue and not cls._fromResponderQueue.empty():
123# receivedQuery = cls._fromResponderQueue.get(True, timeout)
124# return (receivedQuery, message)
125# else:
126# return message
127
128class TestDOH(DNSDistDOHTest):
129
130 _serverKey = 'server.key'
131 _serverCert = 'server.chain'
132 _serverName = 'tls.tests.dnsdist.org'
133 _caCert = 'ca.pem'
134 _dohServerPort = 8443
ee01507f
CR
135 _customResponseHeader1 = 'access-control-allow-origin: *'
136 _customResponseHeader2 = 'user-agent: derp'
10535d88
RG
137 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
138 _config_template = """
139 newServer{address="127.0.0.1:%s"}
ee01507f
CR
140
141 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
10535d88
RG
142
143 addAction("drop.doh.tests.powerdns.com.", DropAction())
144 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
145 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
9ba32868
RG
146 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
147 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
10535d88
RG
148 """
149 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
150
151 def testDOHSimple(self):
152 """
153 DOH: Simple query
154 """
155 name = 'simple.doh.tests.powerdns.com.'
156 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
157 query.id = 0
158 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
159 expectedQuery.id = 0
160 response = dns.message.make_response(query)
161 rrset = dns.rrset.from_text(name,
162 3600,
163 dns.rdataclass.IN,
164 dns.rdatatype.A,
165 '127.0.0.1')
166 response.answer.append(rrset)
167
168 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
169 self.assertTrue(receivedQuery)
170 self.assertTrue(receivedResponse)
171 receivedQuery.id = expectedQuery.id
172 self.assertEquals(expectedQuery, receivedQuery)
811872fb
RG
173 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
174 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
10535d88
RG
175 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
176 self.assertEquals(response, receivedResponse)
177
b1e527ad
RG
178 def testDOHSimplePOST(self):
179 """
180 DOH: Simple POST query
181 """
182 name = 'simple-post.doh.tests.powerdns.com.'
183 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
184 query.id = 0
185 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
186 expectedQuery.id = 0
187 response = dns.message.make_response(query)
188 rrset = dns.rrset.from_text(name,
189 3600,
190 dns.rdataclass.IN,
191 dns.rdatatype.A,
192 '127.0.0.1')
193 response.answer.append(rrset)
194
195 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
196 self.assertTrue(receivedQuery)
197 self.assertTrue(receivedResponse)
198 receivedQuery.id = expectedQuery.id
199 self.assertEquals(expectedQuery, receivedQuery)
200 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
201 self.assertEquals(response, receivedResponse)
202
10535d88
RG
203 def testDOHExistingEDNS(self):
204 """
205 DOH: Existing EDNS
206 """
207 name = 'existing-edns.doh.tests.powerdns.com.'
208 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
209 query.id = 0
210 response = dns.message.make_response(query)
211 rrset = dns.rrset.from_text(name,
212 3600,
213 dns.rdataclass.IN,
214 dns.rdatatype.A,
215 '127.0.0.1')
216 response.answer.append(rrset)
217
218 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
219 self.assertTrue(receivedQuery)
220 self.assertTrue(receivedResponse)
221 receivedQuery.id = query.id
222 self.assertEquals(query, receivedQuery)
223 self.assertEquals(response, receivedResponse)
224 self.checkQueryEDNSWithoutECS(query, receivedQuery)
225 self.checkResponseEDNSWithoutECS(response, receivedResponse)
226
227 def testDOHExistingECS(self):
228 """
229 DOH: Existing EDNS Client Subnet
230 """
231 name = 'existing-ecs.doh.tests.powerdns.com.'
232 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
233 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
234 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
235 query.id = 0
236 response = dns.message.make_response(query)
237 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
238 rrset = dns.rrset.from_text(name,
239 3600,
240 dns.rdataclass.IN,
241 dns.rdatatype.A,
242 '127.0.0.1')
243 response.answer.append(rrset)
244
245 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
246 self.assertTrue(receivedQuery)
247 self.assertTrue(receivedResponse)
248 receivedQuery.id = query.id
249 self.assertEquals(query, receivedQuery)
250 self.assertEquals(response, receivedResponse)
251 self.checkQueryEDNSWithECS(query, receivedQuery)
252 self.checkResponseEDNSWithECS(response, receivedResponse)
253
254 def testDropped(self):
255 """
256 DOH: Dropped query
257 """
258 name = 'drop.doh.tests.powerdns.com.'
259 query = dns.message.make_query(name, 'A', 'IN')
260 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
10535d88
RG
261 self.assertEquals(receivedResponse, None)
262
263 def testRefused(self):
264 """
265 DOH: Refused
266 """
267 name = 'refused.doh.tests.powerdns.com.'
268 query = dns.message.make_query(name, 'A', 'IN')
269 query.id = 0
270 expectedResponse = dns.message.make_response(query)
271 expectedResponse.set_rcode(dns.rcode.REFUSED)
272
273 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
274 self.assertEquals(receivedResponse, expectedResponse)
275
276 def testSpoof(self):
277 """
278 DOH: Spoofed
279 """
280 name = 'spoof.doh.tests.powerdns.com.'
281 query = dns.message.make_query(name, 'A', 'IN')
282 query.id = 0
283 query.flags &= ~dns.flags.RD
284 expectedResponse = dns.message.make_response(query)
285 rrset = dns.rrset.from_text(name,
286 3600,
287 dns.rdataclass.IN,
288 dns.rdatatype.A,
289 '1.2.3.4')
290 expectedResponse.answer.append(rrset)
291
292 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
293 self.assertEquals(receivedResponse, expectedResponse)
294
47225117
RG
295 def testDOHInvalid(self):
296 """
297 DOH: Invalid query
298 """
299 name = 'invalid.doh.tests.powerdns.com.'
300 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
301 invalidQuery.id = 0
302 # first an invalid query
303 invalidQuery = invalidQuery.to_wire()
304 invalidQuery = invalidQuery[:-5]
305 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
306 self.assertEquals(receivedResponse, None)
307
308 # and now a valid one
309 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
310 query.id = 0
311 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
312 expectedQuery.id = 0
313 response = dns.message.make_response(query)
314 rrset = dns.rrset.from_text(name,
315 3600,
316 dns.rdataclass.IN,
317 dns.rdatatype.A,
318 '127.0.0.1')
319 response.answer.append(rrset)
320 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
321 self.assertTrue(receivedQuery)
322 self.assertTrue(receivedResponse)
323 receivedQuery.id = expectedQuery.id
324 self.assertEquals(expectedQuery, receivedQuery)
325 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
326 self.assertEquals(response, receivedResponse)
10535d88 327
2cb8efb1
RG
328 def testDOHWithoutQuery(self):
329 """
330 DOH: Empty GET query
331 """
332 name = 'empty-get.doh.tests.powerdns.com.'
333 url = self._dohBaseURL
334 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
335 conn.setopt(pycurl.URL, url)
336 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
337 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
338 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
339 conn.setopt(pycurl.CAINFO, self._caCert)
340 data = conn.perform_rb()
341 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
342 self.assertEquals(rcode, 400)
343
b1e527ad
RG
344 def testDOHEmptyPOST(self):
345 """
346 DOH: Empty POST query
347 """
348 name = 'empty-post.doh.tests.powerdns.com.'
349
350 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
351 self.assertEquals(receivedResponse, None)
352
353 # and now a valid one
354 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
355 query.id = 0
356 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
357 expectedQuery.id = 0
358 response = dns.message.make_response(query)
359 rrset = dns.rrset.from_text(name,
360 3600,
361 dns.rdataclass.IN,
362 dns.rdatatype.A,
363 '127.0.0.1')
364 response.answer.append(rrset)
365 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
366 self.assertTrue(receivedQuery)
367 self.assertTrue(receivedResponse)
368 receivedQuery.id = expectedQuery.id
369 self.assertEquals(expectedQuery, receivedQuery)
370 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
371 self.assertEquals(response, receivedResponse)
372
9ba32868
RG
373 def testHeaderRule(self):
374 """
375 DOH: HeaderRule
376 """
377 name = 'header-rule.doh.tests.powerdns.com.'
378 query = dns.message.make_query(name, 'A', 'IN')
379 query.id = 0
380 query.flags &= ~dns.flags.RD
381 expectedResponse = dns.message.make_response(query)
382 rrset = dns.rrset.from_text(name,
383 3600,
384 dns.rdataclass.IN,
385 dns.rdatatype.A,
386 '2.3.4.5')
387 expectedResponse.answer.append(rrset)
388
389 # this header should match
390 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
391 self.assertEquals(receivedResponse, expectedResponse)
392
393 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
394 expectedQuery.flags &= ~dns.flags.RD
395 expectedQuery.id = 0
396 response = dns.message.make_response(query)
397 rrset = dns.rrset.from_text(name,
398 3600,
399 dns.rdataclass.IN,
400 dns.rdatatype.A,
401 '127.0.0.1')
402 response.answer.append(rrset)
403
404 # this content of the header should NOT match
405 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
406 self.assertTrue(receivedQuery)
407 self.assertTrue(receivedResponse)
408 receivedQuery.id = expectedQuery.id
409 self.assertEquals(expectedQuery, receivedQuery)
410 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
411 self.assertEquals(response, receivedResponse)
412
413 def testHTTPPath(self):
414 """
415 DOH: HTTPPath
416 """
417 name = 'http-path.doh.tests.powerdns.com.'
418 query = dns.message.make_query(name, 'A', 'IN')
419 query.id = 0
420 query.flags &= ~dns.flags.RD
421 expectedResponse = dns.message.make_response(query)
422 rrset = dns.rrset.from_text(name,
423 3600,
424 dns.rdataclass.IN,
425 dns.rdatatype.A,
426 '3.4.5.6')
427 expectedResponse.answer.append(rrset)
428
429 # this path should match
430 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
431 self.assertEquals(receivedResponse, expectedResponse)
432
433 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
434 expectedQuery.id = 0
435 expectedQuery.flags &= ~dns.flags.RD
436 response = dns.message.make_response(query)
437 rrset = dns.rrset.from_text(name,
438 3600,
439 dns.rdataclass.IN,
440 dns.rdatatype.A,
441 '127.0.0.1')
442 response.answer.append(rrset)
443
444 # this path should NOT match
445 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
446 self.assertTrue(receivedQuery)
447 self.assertTrue(receivedResponse)
448 receivedQuery.id = expectedQuery.id
449 self.assertEquals(expectedQuery, receivedQuery)
450 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
451 self.assertEquals(response, receivedResponse)
452
10535d88
RG
453class TestDOHAddingECS(DNSDistDOHTest):
454
455 _serverKey = 'server.key'
456 _serverCert = 'server.chain'
457 _serverName = 'tls.tests.dnsdist.org'
458 _caCert = 'ca.pem'
459 _dohServerPort = 8443
460 _serverName = 'tls.tests.dnsdist.org'
461 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
462 _config_template = """
463 newServer{address="127.0.0.1:%s", useClientSubnet=true}
464 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
465 setECSOverride(true)
466 """
467 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
468
469 def testDOHSimple(self):
470 """
471 DOH with ECS: Simple query
472 """
473 name = 'simple.doh-ecs.tests.powerdns.com.'
474 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
475 query.id = 0
476 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
477 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
478 response = dns.message.make_response(query)
479 rrset = dns.rrset.from_text(name,
480 3600,
481 dns.rdataclass.IN,
482 dns.rdatatype.A,
483 '127.0.0.1')
484 response.answer.append(rrset)
485
486 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
487 self.assertTrue(receivedQuery)
488 self.assertTrue(receivedResponse)
489 expectedQuery.id = receivedQuery.id
490 self.assertEquals(expectedQuery, receivedQuery)
491 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
492 self.assertEquals(response, receivedResponse)
493 self.checkResponseNoEDNS(response, receivedResponse)
494
495 def testDOHExistingEDNS(self):
496 """
497 DOH with ECS: Existing EDNS
498 """
499 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
500 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
501 query.id = 0
502 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
503 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
504 response = dns.message.make_response(query)
505 rrset = dns.rrset.from_text(name,
506 3600,
507 dns.rdataclass.IN,
508 dns.rdatatype.A,
509 '127.0.0.1')
510 response.answer.append(rrset)
511
512 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
513 self.assertTrue(receivedQuery)
514 self.assertTrue(receivedResponse)
515 receivedQuery.id = expectedQuery.id
516 self.assertEquals(expectedQuery, receivedQuery)
517 self.assertEquals(response, receivedResponse)
518 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
519 self.checkResponseEDNSWithoutECS(response, receivedResponse)
520
521 def testDOHExistingECS(self):
522 """
523 DOH with ECS: Existing EDNS Client Subnet
524 """
525 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
526 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
527 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
528 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
529 query.id = 0
530 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
531 response = dns.message.make_response(query)
532 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
533 rrset = dns.rrset.from_text(name,
534 3600,
535 dns.rdataclass.IN,
536 dns.rdatatype.A,
537 '127.0.0.1')
538 response.answer.append(rrset)
539
540 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
541 self.assertTrue(receivedQuery)
542 self.assertTrue(receivedResponse)
543 receivedQuery.id = expectedQuery.id
544 self.assertEquals(expectedQuery, receivedQuery)
545 self.assertEquals(response, receivedResponse)
546 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
547 self.checkResponseEDNSWithECS(response, receivedResponse)