_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
+ _tlsServerPort = 8453
_dohServerPort = 8443
_dohBaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohServerPort))
- _config_params = ['_testServerPort', '_protobufServerPort', '_dohServerPort', '_serverCert', '_serverKey']
+ _config_params = ['_testServerPort', '_protobufServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohServerPort', '_serverCert', '_serverKey']
_config_template = """
newServer{address="127.0.0.1:%d"}
rl = newRemoteLogger('127.0.0.1:%d')
+
+ addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
addDOHLocal("127.0.0.1:%s", "%s", "%s", { '/dns-query' }, { keepIncomingHeaders=true })
local mytags = {path='doh-path', host='doh-host', ['query-string']='doh-query-string', scheme='doh-scheme', agent='doh-header:user-agent'}
'127.0.0.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, response=response)
-
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEqual(query, receivedQuery)
- self.assertEqual(response, receivedResponse)
-
- # let the protobuf messages the time to get there
- time.sleep(1)
-
- # check the protobuf message corresponding to the UDP query
- msg = self.getFirstProtobufMessage()
-
- self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.DOH, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- self.assertEqual(len(msg.meta), 5)
- tags = {}
- for entry in msg.meta:
- self.assertEqual(len(entry.value.stringVal), 1)
- tags[entry.key] = entry.value.stringVal[0]
-
- self.assertIn('agent', tags)
- self.assertIn('PycURL', tags['agent'])
- self.assertIn('host', tags)
- self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohServerPort))
- self.assertIn('path', tags)
- self.assertEqual(tags['path'], '/dns-query')
- self.assertIn('query-string', tags)
- self.assertIn('?dns=', tags['query-string'])
- self.assertIn('scheme', tags)
- self.assertEqual(tags['scheme'], 'https')
-
- # check the protobuf message corresponding to the UDP response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.DOH, response)
- self.assertEqual(len(msg.meta), 5)
- tags = {}
- for entry in msg.meta:
- self.assertEqual(len(entry.value.stringVal), 1)
- tags[entry.key] = entry.value.stringVal[0]
-
- self.assertIn('agent', tags)
- self.assertIn('PycURL', tags['agent'])
- self.assertIn('host', tags)
- self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohServerPort))
- self.assertIn('path', tags)
- self.assertEqual(tags['path'], '/dns-query')
- self.assertIn('query-string', tags)
- self.assertIn('?dns=', tags['query-string'])
- self.assertIn('scheme', tags)
- self.assertEqual(tags['scheme'], 'https')
+ for method in ("sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHQueryWrapper"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the query
+ msg = self.getFirstProtobufMessage()
+
+ if method == "sendUDPQuery":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.UDP
+ elif method == "sendTCPQuery":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.TCP
+ elif method == "sendDOTQueryWrapper":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.DOT
+ elif method == "sendDOHQueryWrapper":
+ pbMessageType = dnsmessage_pb2.PBDNSMessage.DOH
+
+ print(method)
+ self.checkProtobufQuery(msg, pbMessageType, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ self.assertEqual(len(msg.meta), 5)
+ tags = {}
+ for entry in msg.meta:
+ if method == "sendDOHQueryWrapper":
+ self.assertEqual(len(entry.value.stringVal), 1)
+ tags[entry.key] = entry.value.stringVal[0]
+ else:
+ self.assertEqual(len(entry.value.stringVal), 0)
+ tags[entry.key] = None
+
+ self.assertIn('agent', tags)
+ if method == "sendDOHQueryWrapper":
+ self.assertIn('PycURL', tags['agent'])
+ self.assertIn('host', tags)
+ self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohServerPort))
+ self.assertIn('path', tags)
+ self.assertEqual(tags['path'], '/dns-query')
+ self.assertIn('query-string', tags)
+ self.assertIn('?dns=', tags['query-string'])
+ self.assertIn('scheme', tags)
+ self.assertEqual(tags['scheme'], 'https')
+
+ # check the protobuf message corresponding to the response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, pbMessageType, response)
+ self.assertEqual(len(msg.meta), 5)
+ tags = {}
+ for entry in msg.meta:
+ if method == "sendDOHQueryWrapper":
+ self.assertEqual(len(entry.value.stringVal), 1)
+ tags[entry.key] = entry.value.stringVal[0]
+ else:
+ self.assertEqual(len(entry.value.stringVal), 0)
+ tags[entry.key] = None
+
+ self.assertIn('agent', tags)
+ if method == "sendDOHQueryWrapper":
+ self.assertIn('PycURL', tags['agent'])
+ self.assertIn('host', tags)
+ self.assertEqual(tags['host'], self._serverName + ':' + str(self._dohServerPort))
+ self.assertIn('path', tags)
+ self.assertEqual(tags['path'], '/dns-query')
+ self.assertIn('query-string', tags)
+ self.assertIn('?dns=', tags['query-string'])
+ self.assertIn('scheme', tags)
+ self.assertEqual(tags['scheme'], 'https')
class TestProtobufMetaProxy(DNSDistProtobufTest):