]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4943c96ec11443eb2e2b6befc8ea0843af571c4f
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 import pycurl
8
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
11
12 class DNSDistDOHTest(DNSDistTest):
13
14 @classmethod
15 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
16 if rawQuery:
17 wire = query
18 else:
19 wire = query.to_wire()
20 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
21 return baseurl + "?dns=" + param
22
23 @classmethod
24 def openDOHConnection(cls, port, caFile, timeout=2.0):
25 conn = pycurl.Curl()
26 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
27
28 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
30 return conn
31
32 @classmethod
33 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, customHeaders=[]):
34 url = cls.getDOHGetURL(baseurl, query, rawQuery)
35 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
36 #conn.setopt(pycurl.VERBOSE, True)
37 conn.setopt(pycurl.URL, url)
38 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
39 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
40 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
41 conn.setopt(pycurl.HTTPHEADER, customHeaders)
42 if caFile:
43 conn.setopt(pycurl.CAINFO, caFile)
44
45 if response:
46 cls._toResponderQueue.put(response, True, timeout)
47
48 receivedQuery = None
49 message = None
50 data = conn.perform_rb()
51 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
52 if rcode == 200:
53 message = dns.message.from_wire(data)
54
55 if useQueue and not cls._fromResponderQueue.empty():
56 receivedQuery = cls._fromResponderQueue.get(True, timeout)
57
58 return (receivedQuery, message)
59
60 @classmethod
61 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False):
62 url = baseurl
63 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
64 #conn.setopt(pycurl.VERBOSE, True)
65 conn.setopt(pycurl.URL, url)
66 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
67 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
68 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
69 conn.setopt(pycurl.POST, True)
70 data = query
71 if not rawQuery:
72 data = data.to_wire()
73
74 conn.setopt(pycurl.POSTFIELDS, data)
75
76 if caFile:
77 conn.setopt(pycurl.CAINFO, caFile)
78
79 if response:
80 cls._toResponderQueue.put(response, True, timeout)
81
82 receivedQuery = None
83 message = None
84 data = conn.perform_rb()
85 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
86 if rcode == 200:
87 message = dns.message.from_wire(data)
88
89 if useQueue and not cls._fromResponderQueue.empty():
90 receivedQuery = cls._fromResponderQueue.get(True, timeout)
91
92 return (receivedQuery, message)
93
94 # @classmethod
95 # def openDOHConnection(cls, port, caFile, timeout=2.0):
96 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
97 # sslctx.load_verify_locations(caFile)
98 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
99
100 # @classmethod
101 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
102 # url = cls.getDOHGetURL(baseurl, query)
103
104 # if response:
105 # cls._toResponderQueue.put(response, True, timeout)
106
107 # conn.request('GET', url)
108
109 # @classmethod
110 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
111 # message = None
112 # data = conn.get_response()
113 # if data:
114 # data = data.read()
115 # if data:
116 # message = dns.message.from_wire(data)
117
118 # if useQueue and not cls._fromResponderQueue.empty():
119 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
120 # return (receivedQuery, message)
121 # else:
122 # return message
123
124 class TestDOH(DNSDistDOHTest):
125
126 _serverKey = 'server.key'
127 _serverCert = 'server.chain'
128 _serverName = 'tls.tests.dnsdist.org'
129 _caCert = 'ca.pem'
130 _dohServerPort = 8443
131 _serverName = 'tls.tests.dnsdist.org'
132 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
133 _config_template = """
134 newServer{address="127.0.0.1:%s"}
135 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
136
137 addAction("drop.doh.tests.powerdns.com.", DropAction())
138 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
139 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
140 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
141 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
142 """
143 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
144
145 def testDOHSimple(self):
146 """
147 DOH: Simple query
148 """
149 name = 'simple.doh.tests.powerdns.com.'
150 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
151 query.id = 0
152 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
153 expectedQuery.id = 0
154 response = dns.message.make_response(query)
155 rrset = dns.rrset.from_text(name,
156 3600,
157 dns.rdataclass.IN,
158 dns.rdatatype.A,
159 '127.0.0.1')
160 response.answer.append(rrset)
161
162 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
163 self.assertTrue(receivedQuery)
164 self.assertTrue(receivedResponse)
165 receivedQuery.id = expectedQuery.id
166 self.assertEquals(expectedQuery, receivedQuery)
167 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
168 self.assertEquals(response, receivedResponse)
169
170 def testDOHSimplePOST(self):
171 """
172 DOH: Simple POST query
173 """
174 name = 'simple-post.doh.tests.powerdns.com.'
175 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
176 query.id = 0
177 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
178 expectedQuery.id = 0
179 response = dns.message.make_response(query)
180 rrset = dns.rrset.from_text(name,
181 3600,
182 dns.rdataclass.IN,
183 dns.rdatatype.A,
184 '127.0.0.1')
185 response.answer.append(rrset)
186
187 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
188 self.assertTrue(receivedQuery)
189 self.assertTrue(receivedResponse)
190 receivedQuery.id = expectedQuery.id
191 self.assertEquals(expectedQuery, receivedQuery)
192 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
193 self.assertEquals(response, receivedResponse)
194
195 def testDOHExistingEDNS(self):
196 """
197 DOH: Existing EDNS
198 """
199 name = 'existing-edns.doh.tests.powerdns.com.'
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
201 query.id = 0
202 response = dns.message.make_response(query)
203 rrset = dns.rrset.from_text(name,
204 3600,
205 dns.rdataclass.IN,
206 dns.rdatatype.A,
207 '127.0.0.1')
208 response.answer.append(rrset)
209
210 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
211 self.assertTrue(receivedQuery)
212 self.assertTrue(receivedResponse)
213 receivedQuery.id = query.id
214 self.assertEquals(query, receivedQuery)
215 self.assertEquals(response, receivedResponse)
216 self.checkQueryEDNSWithoutECS(query, receivedQuery)
217 self.checkResponseEDNSWithoutECS(response, receivedResponse)
218
219 def testDOHExistingECS(self):
220 """
221 DOH: Existing EDNS Client Subnet
222 """
223 name = 'existing-ecs.doh.tests.powerdns.com.'
224 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
225 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
226 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
227 query.id = 0
228 response = dns.message.make_response(query)
229 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
230 rrset = dns.rrset.from_text(name,
231 3600,
232 dns.rdataclass.IN,
233 dns.rdatatype.A,
234 '127.0.0.1')
235 response.answer.append(rrset)
236
237 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
238 self.assertTrue(receivedQuery)
239 self.assertTrue(receivedResponse)
240 receivedQuery.id = query.id
241 self.assertEquals(query, receivedQuery)
242 self.assertEquals(response, receivedResponse)
243 self.checkQueryEDNSWithECS(query, receivedQuery)
244 self.checkResponseEDNSWithECS(response, receivedResponse)
245
246 def testDropped(self):
247 """
248 DOH: Dropped query
249 """
250 name = 'drop.doh.tests.powerdns.com.'
251 query = dns.message.make_query(name, 'A', 'IN')
252 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
253 self.assertEquals(receivedResponse, None)
254
255 def testRefused(self):
256 """
257 DOH: Refused
258 """
259 name = 'refused.doh.tests.powerdns.com.'
260 query = dns.message.make_query(name, 'A', 'IN')
261 query.id = 0
262 expectedResponse = dns.message.make_response(query)
263 expectedResponse.set_rcode(dns.rcode.REFUSED)
264
265 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
266 self.assertEquals(receivedResponse, expectedResponse)
267
268 def testSpoof(self):
269 """
270 DOH: Spoofed
271 """
272 name = 'spoof.doh.tests.powerdns.com.'
273 query = dns.message.make_query(name, 'A', 'IN')
274 query.id = 0
275 query.flags &= ~dns.flags.RD
276 expectedResponse = dns.message.make_response(query)
277 rrset = dns.rrset.from_text(name,
278 3600,
279 dns.rdataclass.IN,
280 dns.rdatatype.A,
281 '1.2.3.4')
282 expectedResponse.answer.append(rrset)
283
284 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
285 self.assertEquals(receivedResponse, expectedResponse)
286
287 def testDOHInvalid(self):
288 """
289 DOH: Invalid query
290 """
291 name = 'invalid.doh.tests.powerdns.com.'
292 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
293 invalidQuery.id = 0
294 # first an invalid query
295 invalidQuery = invalidQuery.to_wire()
296 invalidQuery = invalidQuery[:-5]
297 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
298 self.assertEquals(receivedResponse, None)
299
300 # and now a valid one
301 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
302 query.id = 0
303 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
304 expectedQuery.id = 0
305 response = dns.message.make_response(query)
306 rrset = dns.rrset.from_text(name,
307 3600,
308 dns.rdataclass.IN,
309 dns.rdatatype.A,
310 '127.0.0.1')
311 response.answer.append(rrset)
312 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
313 self.assertTrue(receivedQuery)
314 self.assertTrue(receivedResponse)
315 receivedQuery.id = expectedQuery.id
316 self.assertEquals(expectedQuery, receivedQuery)
317 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
318 self.assertEquals(response, receivedResponse)
319
320 def testDOHWithoutQuery(self):
321 """
322 DOH: Empty GET query
323 """
324 name = 'empty-get.doh.tests.powerdns.com.'
325 url = self._dohBaseURL
326 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
327 conn.setopt(pycurl.URL, url)
328 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
329 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
330 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
331 conn.setopt(pycurl.CAINFO, self._caCert)
332 data = conn.perform_rb()
333 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
334 self.assertEquals(rcode, 400)
335
336 def testDOHEmptyPOST(self):
337 """
338 DOH: Empty POST query
339 """
340 name = 'empty-post.doh.tests.powerdns.com.'
341
342 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
343 self.assertEquals(receivedResponse, None)
344
345 # and now a valid one
346 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
347 query.id = 0
348 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
349 expectedQuery.id = 0
350 response = dns.message.make_response(query)
351 rrset = dns.rrset.from_text(name,
352 3600,
353 dns.rdataclass.IN,
354 dns.rdatatype.A,
355 '127.0.0.1')
356 response.answer.append(rrset)
357 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
358 self.assertTrue(receivedQuery)
359 self.assertTrue(receivedResponse)
360 receivedQuery.id = expectedQuery.id
361 self.assertEquals(expectedQuery, receivedQuery)
362 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
363 self.assertEquals(response, receivedResponse)
364
365 def testHeaderRule(self):
366 """
367 DOH: HeaderRule
368 """
369 name = 'header-rule.doh.tests.powerdns.com.'
370 query = dns.message.make_query(name, 'A', 'IN')
371 query.id = 0
372 query.flags &= ~dns.flags.RD
373 expectedResponse = dns.message.make_response(query)
374 rrset = dns.rrset.from_text(name,
375 3600,
376 dns.rdataclass.IN,
377 dns.rdatatype.A,
378 '2.3.4.5')
379 expectedResponse.answer.append(rrset)
380
381 # this header should match
382 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
383 self.assertEquals(receivedResponse, expectedResponse)
384
385 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
386 expectedQuery.flags &= ~dns.flags.RD
387 expectedQuery.id = 0
388 response = dns.message.make_response(query)
389 rrset = dns.rrset.from_text(name,
390 3600,
391 dns.rdataclass.IN,
392 dns.rdatatype.A,
393 '127.0.0.1')
394 response.answer.append(rrset)
395
396 # this content of the header should NOT match
397 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
398 self.assertTrue(receivedQuery)
399 self.assertTrue(receivedResponse)
400 receivedQuery.id = expectedQuery.id
401 self.assertEquals(expectedQuery, receivedQuery)
402 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
403 self.assertEquals(response, receivedResponse)
404
405 def testHTTPPath(self):
406 """
407 DOH: HTTPPath
408 """
409 name = 'http-path.doh.tests.powerdns.com.'
410 query = dns.message.make_query(name, 'A', 'IN')
411 query.id = 0
412 query.flags &= ~dns.flags.RD
413 expectedResponse = dns.message.make_response(query)
414 rrset = dns.rrset.from_text(name,
415 3600,
416 dns.rdataclass.IN,
417 dns.rdatatype.A,
418 '3.4.5.6')
419 expectedResponse.answer.append(rrset)
420
421 # this path should match
422 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
423 self.assertEquals(receivedResponse, expectedResponse)
424
425 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
426 expectedQuery.id = 0
427 expectedQuery.flags &= ~dns.flags.RD
428 response = dns.message.make_response(query)
429 rrset = dns.rrset.from_text(name,
430 3600,
431 dns.rdataclass.IN,
432 dns.rdatatype.A,
433 '127.0.0.1')
434 response.answer.append(rrset)
435
436 # this path should NOT match
437 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
438 self.assertTrue(receivedQuery)
439 self.assertTrue(receivedResponse)
440 receivedQuery.id = expectedQuery.id
441 self.assertEquals(expectedQuery, receivedQuery)
442 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
443 self.assertEquals(response, receivedResponse)
444
445 class TestDOHAddingECS(DNSDistDOHTest):
446
447 _serverKey = 'server.key'
448 _serverCert = 'server.chain'
449 _serverName = 'tls.tests.dnsdist.org'
450 _caCert = 'ca.pem'
451 _dohServerPort = 8443
452 _serverName = 'tls.tests.dnsdist.org'
453 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
454 _config_template = """
455 newServer{address="127.0.0.1:%s", useClientSubnet=true}
456 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
457 setECSOverride(true)
458 """
459 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
460
461 def testDOHSimple(self):
462 """
463 DOH with ECS: Simple query
464 """
465 name = 'simple.doh-ecs.tests.powerdns.com.'
466 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
467 query.id = 0
468 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
469 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
470 response = dns.message.make_response(query)
471 rrset = dns.rrset.from_text(name,
472 3600,
473 dns.rdataclass.IN,
474 dns.rdatatype.A,
475 '127.0.0.1')
476 response.answer.append(rrset)
477
478 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
479 self.assertTrue(receivedQuery)
480 self.assertTrue(receivedResponse)
481 expectedQuery.id = receivedQuery.id
482 self.assertEquals(expectedQuery, receivedQuery)
483 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
484 self.assertEquals(response, receivedResponse)
485 self.checkResponseNoEDNS(response, receivedResponse)
486
487 def testDOHExistingEDNS(self):
488 """
489 DOH with ECS: Existing EDNS
490 """
491 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
492 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
493 query.id = 0
494 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
495 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
496 response = dns.message.make_response(query)
497 rrset = dns.rrset.from_text(name,
498 3600,
499 dns.rdataclass.IN,
500 dns.rdatatype.A,
501 '127.0.0.1')
502 response.answer.append(rrset)
503
504 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
505 self.assertTrue(receivedQuery)
506 self.assertTrue(receivedResponse)
507 receivedQuery.id = expectedQuery.id
508 self.assertEquals(expectedQuery, receivedQuery)
509 self.assertEquals(response, receivedResponse)
510 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
511 self.checkResponseEDNSWithoutECS(response, receivedResponse)
512
513 def testDOHExistingECS(self):
514 """
515 DOH with ECS: Existing EDNS Client Subnet
516 """
517 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
518 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
519 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
520 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
521 query.id = 0
522 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
523 response = dns.message.make_response(query)
524 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
525 rrset = dns.rrset.from_text(name,
526 3600,
527 dns.rdataclass.IN,
528 dns.rdatatype.A,
529 '127.0.0.1')
530 response.answer.append(rrset)
531
532 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
533 self.assertTrue(receivedQuery)
534 self.assertTrue(receivedResponse)
535 receivedQuery.id = expectedQuery.id
536 self.assertEquals(expectedQuery, receivedQuery)
537 self.assertEquals(response, receivedResponse)
538 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
539 self.checkResponseEDNSWithECS(response, receivedResponse)