#!/usr/bin/env python
+import base64
import threading
import socket
import struct
import sys
import time
-from dnsdisttests import DNSDistTest, Queue
+from dnsdisttests import DNSDistTest, pickAvailablePort, Queue
+from proxyprotocol import ProxyProtocol
import dns
import dnsmessage_pb2
+import extendederrors
class DNSDistProtobufTest(DNSDistTest):
- _protobufServerPort = 4242
+ _protobufServerPort = pickAvailablePort()
_protobufQueue = Queue()
_protobufServerID = 'dnsdist-server-1'
_protobufCounter = 0
@classmethod
def startResponders(cls):
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._protobufListener = threading.Thread(name='Protobuf Listener', target=cls.ProtobufListener, args=[cls._protobufServerPort])
- cls._protobufListener.setDaemon(True)
+ cls._protobufListener.daemon = True
cls._protobufListener.start()
def getFirstProtobufMessage(self):
msg.ParseFromString(data)
return msg
- def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True):
+ def checkProtobufBase(self, msg, protocol, query, initiator, normalQueryResponse=True, v6=False):
self.assertTrue(msg)
self.assertTrue(msg.HasField('timeSec'))
self.assertTrue(msg.HasField('socketFamily'))
- self.assertEqual(msg.socketFamily, dnsmessage_pb2.PBDNSMessage.INET)
+ if v6:
+ self.assertEqual(msg.socketFamily, dnsmessage_pb2.PBDNSMessage.INET6)
+ else:
+ self.assertEqual(msg.socketFamily, dnsmessage_pb2.PBDNSMessage.INET)
self.assertTrue(msg.HasField('from'))
fromvalue = getattr(msg, 'from')
- self.assertEqual(socket.inet_ntop(socket.AF_INET, fromvalue), initiator)
+ if v6:
+ self.assertEqual(socket.inet_ntop(socket.AF_INET6, fromvalue), initiator)
+ else:
+ self.assertEqual(socket.inet_ntop(socket.AF_INET, fromvalue), initiator)
self.assertTrue(msg.HasField('socketProtocol'))
self.assertEqual(msg.socketProtocol, protocol)
self.assertTrue(msg.HasField('messageId'))
self.assertTrue(msg.HasField('serverIdentity'))
self.assertEqual(msg.serverIdentity, self._protobufServerID.encode('utf-8'))
- if normalQueryResponse:
+ if normalQueryResponse and (protocol == dnsmessage_pb2.PBDNSMessage.UDP or protocol == dnsmessage_pb2.PBDNSMessage.TCP):
# compare inBytes with length of query/response
self.assertEqual(msg.inBytes, len(query.to_wire()))
# dnsdist doesn't set the existing EDNS Subnet for now,
# self.assertEqual(len(msg.originalRequestorSubnet), 4)
# self.assertEqual(socket.inet_ntop(socket.AF_INET, msg.originalRequestorSubnet), '127.0.0.1')
- def checkProtobufQuery(self, msg, protocol, query, qclass, qtype, qname, initiator='127.0.0.1'):
+ def checkProtobufQuery(self, msg, protocol, query, qclass, qtype, qname, initiator='127.0.0.1', v6=False):
self.assertEqual(msg.type, dnsmessage_pb2.PBDNSMessage.DNSQueryType)
- self.checkProtobufBase(msg, protocol, query, initiator)
+ self.checkProtobufBase(msg, protocol, query, initiator, v6=v6)
# dnsdist doesn't fill the responder field for responses
# because it doesn't keep the information around.
self.assertTrue(msg.HasField('to'))
- self.assertEqual(socket.inet_ntop(socket.AF_INET, msg.to), '127.0.0.1')
+ if not v6:
+ self.assertEqual(socket.inet_ntop(socket.AF_INET, msg.to), '127.0.0.1')
self.assertTrue(msg.HasField('question'))
self.assertTrue(msg.question.HasField('qClass'))
self.assertEqual(msg.question.qClass, qclass)
self.assertTrue(msg.HasField('response'))
self.assertTrue(msg.response.HasField('queryTimeSec'))
- def checkProtobufResponse(self, msg, protocol, response, initiator='127.0.0.1'):
+ def checkProtobufResponse(self, msg, protocol, response, initiator='127.0.0.1', v6=False):
self.assertEqual(msg.type, dnsmessage_pb2.PBDNSMessage.DNSResponseType)
- self.checkProtobufBase(msg, protocol, response, initiator)
+ self.checkProtobufBase(msg, protocol, response, initiator, v6=v6)
self.assertTrue(msg.HasField('response'))
self.assertTrue(msg.response.HasField('queryTimeSec'))
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
- # let the protobuf messages the time to get there
- time.sleep(1)
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
# check the protobuf message corresponding to the UDP query
msg = self.getFirstProtobufMessage()
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
- # let the protobuf messages the time to get there
- time.sleep(1)
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
# check the protobuf message corresponding to the TCP query
msg = self.getFirstProtobufMessage()
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
-
- # let the protobuf messages the time to get there
- time.sleep(1)
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
# check the protobuf message corresponding to the UDP query
msg = self.getFirstProtobufMessage()
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
- # let the protobuf messages the time to get there
- time.sleep(1)
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
# check the protobuf message corresponding to the TCP query
msg = self.getFirstProtobufMessage()
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 3600)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+class TestProtobufMetaTags(DNSDistProtobufTest):
+ _config_params = ['_testServerPort', '_protobufServerPort']
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ rl = newRemoteLogger('127.0.0.1:%d')
+
+ addAction(AllRule(), SetTagAction('my-tag-key', 'my-tag-value'))
+ addAction(AllRule(), SetTagAction('my-empty-key', ''))
+ addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='dnsdist-server-1', exportTags='*'}, {b64='b64-content', ['my-tag-export-name']='tag:my-tag-key'}))
+ addResponseAction(AllRule(), SetTagResponseAction('my-tag-key2', 'my-tag-value2'))
+ addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, false, {serverID='dnsdist-server-1', exportTags='my-empty-key,my-tag-key2'}, {['my-tag-export-name']='tags'}))
+ """
+
+ def testProtobufMeta(self):
+ """
+ Protobuf: Meta values
+ """
+ name = 'meta.protobuf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the UDP query
+ msg = self.getFirstProtobufMessage()
+
+ self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ # regular tags
+ self.assertEqual(len(msg.response.tags), 2)
+ self.assertIn('my-tag-key:my-tag-value', msg.response.tags)
+ self.assertIn('my-empty-key', msg.response.tags)
+ # meta tags
+ self.assertEqual(len(msg.meta), 2)
+ tags = {}
+ for entry in msg.meta:
+ tags[entry.key] = entry.value.stringVal
+
+ self.assertIn('b64', tags)
+ self.assertIn('my-tag-export-name', tags)
+
+ b64EncodedQuery = base64.b64encode(query.to_wire()).decode('ascii')
+ self.assertEqual(tags['b64'], [b64EncodedQuery])
+ self.assertEqual(tags['my-tag-export-name'], ['my-tag-value'])
+
+ # check the protobuf message corresponding to the UDP response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response)
+ # regular tags
+ self.assertEqual(len(msg.response.tags), 2)
+ self.assertIn('my-tag-key2:my-tag-value2', msg.response.tags)
+ self.assertIn('my-empty-key', msg.response.tags)
+ # meta tags
+ self.assertEqual(len(msg.meta), 1)
+ self.assertEqual(msg.meta[0].key, 'my-tag-export-name')
+ self.assertEqual(len(msg.meta[0].value.stringVal), 3)
+ self.assertIn('my-tag-key:my-tag-value', msg.meta[0].value.stringVal)
+ self.assertIn('my-tag-key2:my-tag-value2', msg.meta[0].value.stringVal)
+ # no ':' when the value is empty
+ self.assertIn('my-empty-key', msg.meta[0].value.stringVal)
+
+class TestProtobufExtendedDNSErrorTags(DNSDistProtobufTest):
+ _config_params = ['_testServerPort', '_protobufServerPort']
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ rl = newRemoteLogger('127.0.0.1:%d')
+
+ addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='dnsdist-server-1'}))
+ addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, false, {serverID='dnsdist-server-1', exportExtendedErrorsToMeta='extended-error'}))
+ """
+
+ def testProtobufExtendedError(self):
+ """
+ Protobuf: Extended Error
+ """
+ name = 'extended-error.protobuf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+ ede = extendederrors.ExtendedErrorOption(15, b'Blocked by RPZ!')
+ response.use_edns(edns=True, payload=4096, options=[ede])
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the UDP query
+ msg = self.getFirstProtobufMessage()
+
+ self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+
+ # meta tags
+ self.assertEqual(len(msg.meta), 0)
+
+ # check the protobuf message corresponding to the UDP response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response)
+
+ # meta tags
+ self.assertEqual(len(msg.meta), 1)
+
+ self.assertEqual(msg.meta[0].key, 'extended-error')
+ self.assertEqual(len(msg.meta[0].value.intVal), 1)
+ self.assertEqual(len(msg.meta[0].value.stringVal), 1)
+ self.assertIn(15, msg.meta[0].value.intVal)
+ self.assertIn('Blocked by RPZ!', msg.meta[0].value.stringVal)
+
+class TestProtobufMetaDOH(DNSDistProtobufTest):
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _tlsServerPort = pickAvailablePort()
+ _dohWithNGHTTP2ServerPort = pickAvailablePort()
+ _dohWithNGHTTP2BaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohWithNGHTTP2ServerPort))
+ _dohWithH2OServerPort = pickAvailablePort()
+ _dohWithH2OBaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohWithH2OServerPort))
+ _config_template = """
+ newServer{address="127.0.0.1:%d"}
+ rl = newRemoteLogger('127.0.0.1:%d')
+
+ addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
+ addDOHLocal("127.0.0.1:%d", "%s", "%s", { '/dns-query' }, { keepIncomingHeaders=true, library='nghttp2' })
+ addDOHLocal("127.0.0.1:%d", "%s", "%s", { '/dns-query' }, { keepIncomingHeaders=true, library='h2o' })
+
+ local mytags = {path='doh-path', host='doh-host', ['query-string']='doh-query-string', scheme='doh-scheme', agent='doh-header:user-agent'}
+ addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='dnsdist-server-1'}, mytags))
+ addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, false, {serverID='dnsdist-server-1'}, mytags))
+ """
+ _config_params = ['_testServerPort', '_protobufServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohWithNGHTTP2ServerPort', '_serverCert', '_serverKey', '_dohWithH2OServerPort', '_serverCert', '_serverKey']
+
+ def testProtobufMetaDoH(self):
+ """
+ Protobuf: Meta values - DoH
+ """
+ name = 'meta-doh.protobuf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOHWithH2OQueryWrapper"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the query
+ msg = self.getFirstProtobufMessage()
+
+ if method == "sendUDPQuery":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.UDP
+ elif method == "sendTCPQuery":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.TCP
+ elif method == "sendDOTQueryWrapper":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.DOT
+ elif method == "sendDOHWithNGHTTP2QueryWrapper" or method == "sendDOHWithH2OQueryWrapper":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.DOH
+
+ self.checkProtobufQuery(msg, pbMessageType, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ self.assertEqual(len(msg.meta), 5)
+ tags = {}
+ for entry in msg.meta:
+ self.assertEqual(len(entry.value.stringVal), 1)
+ tags[entry.key] = entry.value.stringVal[0]
+
+ self.assertIn('agent', tags)
+ if method == "sendDOHWithNGHTTP2QueryWrapper" or method == "sendDOHWithH2OQueryWrapper":
+ self.assertIn('PycURL', tags['agent'])
+ self.assertIn('host', tags)
+ if method == "sendDOHWithNGHTTP2QueryWrapper":
+ self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohWithNGHTTP2ServerPort))
+ elif method == "sendDOHWithH2OQueryWrapper":
+ self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohWithH2OServerPort))
+ self.assertIn('path', tags)
+ self.assertEqual(tags['path'], '/dns-query')
+ self.assertIn('query-string', tags)
+ self.assertIn('?dns=', tags['query-string'])
+ self.assertIn('scheme', tags)
+ self.assertEqual(tags['scheme'], 'https')
+ self.assertEqual(msg.httpVersion, dnsmessage_pb2.PBDNSMessage.HTTPVersion.HTTP2)
+
+ # check the protobuf message corresponding to the response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, pbMessageType, response)
+ self.assertEqual(len(msg.meta), 5)
+ tags = {}
+ for entry in msg.meta:
+ self.assertEqual(len(entry.value.stringVal), 1)
+ tags[entry.key] = entry.value.stringVal[0]
+
+ self.assertIn('agent', tags)
+ if method == "sendDOHWithNGHTTP2QueryWrapper" or method == "sendDOHWithH2OQueryWrapper":
+ self.assertIn('PycURL', tags['agent'])
+ self.assertIn('host', tags)
+ if method == "sendDOHWithNGHTTP2QueryWrapper":
+ self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohWithNGHTTP2ServerPort))
+ elif method == "sendDOHWithH2OQueryWrapper":
+ self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohWithH2OServerPort))
+ self.assertIn('path', tags)
+ self.assertEqual(tags['path'], '/dns-query')
+ self.assertIn('query-string', tags)
+ self.assertIn('?dns=', tags['query-string'])
+ self.assertIn('scheme', tags)
+ self.assertEqual(tags['scheme'], 'https')
+
+class TestProtobufMetaProxy(DNSDistProtobufTest):
+
+ _config_params = ['_testServerPort', '_protobufServerPort']
+ _config_template = """
+ setProxyProtocolACL( { "127.0.0.1/32" } )
+
+ newServer{address="127.0.0.1:%d"}
+ rl = newRemoteLogger('127.0.0.1:%d')
+
+ local mytags = {pp='proxy-protocol-values', pp42='proxy-protocol-value:42'}
+ addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='dnsdist-server-1'}, mytags))
+
+ -- proxy protocol values are NOT passed to the response
+ """
+
+ def testProtobufMetaProxy(self):
+ """
+ Protobuf: Meta values - Proxy
+ """
+ name = 'meta-proxy.protobuf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ destAddr = "2001:db8::9"
+ destPort = 9999
+ srcAddr = "2001:db8::8"
+ srcPort = 8888
+ udpPayload = ProxyProtocol.getPayload(False, False, True, srcAddr, destAddr, srcPort, destPort, [ [ 2, b'foo'], [ 42, b'proxy'] ])
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(udpPayload + query.to_wire(), response, rawQuery=True)
+
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the UDP query
+ msg = self.getFirstProtobufMessage()
+
+ self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name, initiator='2001:db8::8', v6=True)
+ self.assertEqual(len(msg.meta), 2)
+ tags = {}
+ for entry in msg.meta:
+ tags[entry.key] = entry.value.stringVal
+
+ self.assertIn('pp42', tags)
+ self.assertEqual(tags['pp42'], ['proxy'])
+ self.assertIn('pp', tags)
+ self.assertEqual(len(tags['pp']), 2)
+ self.assertIn('2:foo', tags['pp'])
+ self.assertIn('42:proxy', tags['pp'])
+
class TestProtobufIPCipher(DNSDistProtobufTest):
_config_params = ['_testServerPort', '_protobufServerPort', '_protobufServerID', '_protobufServerID']
_config_template = """
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
- # let the protobuf messages the time to get there
- time.sleep(1)
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
# check the protobuf message corresponding to the UDP query
msg = self.getFirstProtobufMessage()
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
- # let the protobuf messages the time to get there
- time.sleep(1)
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
# check the protobuf message corresponding to the TCP query
msg = self.getFirstProtobufMessage()
rr = msg.response.rrs[1]
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+
+class TestProtobufQUIC(DNSDistProtobufTest):
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _doqServerPort = pickAvailablePort()
+ _doh3ServerPort = pickAvailablePort()
+ _dohBaseURL = ("https://%s:%d/" % (_serverName, _doh3ServerPort))
+ _config_template = """
+ newServer{address="127.0.0.1:%d"}
+ rl = newRemoteLogger('127.0.0.1:%d')
+
+ addDOQLocal("127.0.0.1:%d", "%s", "%s")
+ addDOH3Local("127.0.0.1:%d", "%s", "%s")
+
+ addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='dnsdist-server-1'}))
+ """
+ _config_params = ['_testServerPort', '_protobufServerPort', '_doqServerPort', '_serverCert', '_serverKey', '_doh3ServerPort', '_serverCert', '_serverKey']
+
+ def testProtobufMetaDoH(self):
+ """
+ Protobuf: Test logged protocol for QUIC and DOH3
+ """
+ name = 'quic.protobuf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ for method in ("sendDOQQueryWrapper", "sendDOH3QueryWrapper"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the query
+ msg = self.getFirstProtobufMessage()
+
+ if method == "sendDOQQueryWrapper":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.DOQ
+ elif method == "sendDOH3QueryWrapper":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.DOH
+ self.assertEqual(msg.httpVersion, dnsmessage_pb2.PBDNSMessage.HTTPVersion.HTTP3)
+
+ self.checkProtobufQuery(msg, pbMessageType, query, dns.rdataclass.IN, dns.rdatatype.A, name)