]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_OutgoingTLS.py
Merge pull request #13864 from omoerbeek/frmstrm-v6
[thirdparty/pdns.git] / regression-tests.dnsdist / test_OutgoingTLS.py
CommitLineData
7d2856a6
RG
1#!/usr/bin/env python
2import dns
eaee76fe 3import requests
7d2856a6
RG
4import ssl
5import threading
6import time
7
630eb526 8from dnsdisttests import DNSDistTest, pickAvailablePort
7d2856a6
RG
9
10class OutgoingTLSTests(object):
11
eaee76fe 12 _webTimeout = 2.0
630eb526 13 _webServerPort = pickAvailablePort()
eaee76fe
RG
14 _webServerBasicAuthPassword = 'secret'
15 _webServerAPIKey = 'apisecret'
7b8a394a
RG
16 _webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
17 _webServerAPIKeyHashed = '$scrypt$ln=10,p=1,r=8$9v8JxDfzQVyTpBkTbkUqYg==$bDQzAOHeK1G9UvTPypNhrX48w974ZXbFPtRKS34+aso='
eaee76fe
RG
18 _verboseMode = True
19
20 def checkOnlyTLSResponderHit(self, numberOfTLSQueries=1):
7d2856a6
RG
21 self.assertNotIn('UDP Responder', self._responsesCounter)
22 self.assertNotIn('TCP Responder', self._responsesCounter)
eaee76fe
RG
23 self.assertEqual(self._responsesCounter['TLS Responder'], numberOfTLSQueries)
24
25 def getServerStat(self, key):
26 headers = {'x-api-key': self._webServerAPIKey}
27 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost'
28 r = requests.get(url, headers=headers, timeout=self._webTimeout)
29 self.assertTrue(r)
30 self.assertEqual(r.status_code, 200)
31 self.assertTrue(r.json())
32 content = r.json()
33 self.assertTrue(len(content['servers']), 1)
34 server = content['servers'][0]
35 self.assertIn(key, server)
36 return server[key]
37
7d2856a6
RG
38 def testUDP(self):
39 """
40 Outgoing TLS: UDP query is sent via TLS
41 """
42 name = 'udp.outgoing-tls.test.powerdns.com.'
43 query = dns.message.make_query(name, 'A', 'IN')
44 expectedResponse = dns.message.make_response(query)
45 rrset = dns.rrset.from_text(name,
46 60,
47 dns.rdataclass.IN,
48 dns.rdatatype.A,
49 '127.0.0.1')
50 expectedResponse.answer.append(rrset)
51
eaee76fe
RG
52 numberOfUDPQueries = 10
53 for _ in range(numberOfUDPQueries):
54 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
645a1ca4 55 receivedQuery.id = query.id
eaee76fe
RG
56 self.assertEqual(query, receivedQuery)
57 self.assertEqual(receivedResponse, expectedResponse)
58
59 # there was one TCP query
60 numberOfQueries = numberOfUDPQueries + 1
61 self.checkOnlyTLSResponderHit(numberOfUDPQueries)
62 # our TLS responder does only one query per connection, so we need one for the TCP
63 # query and one for the UDP one (the TCP test is done first)
64 self.assertEqual(self.getServerStat('tcpNewConnections'), numberOfQueries)
65 # we tried to reuse the connection (and then it failed but hey)
66 self.assertEqual(self.getServerStat('tcpReusedConnections'), numberOfQueries - 1)
67 # we resumed the TLS session, though, but since we only learn about that
2be4efc5
RG
68 # when the connection is closed, we might be off by one, except if a health check
69 # allowed the first TCP connection to be resumed as well
70 self.assertGreaterEqual(self.getServerStat('tlsResumptions'), numberOfUDPQueries - 1)
71 self.assertLessEqual(self.getServerStat('tlsResumptions'), numberOfUDPQueries)
7d2856a6
RG
72
73 def testTCP(self):
74 """
75 Outgoing TLS: TCP query is sent via TLS
76 """
77 name = 'tcp.outgoing-tls.test.powerdns.com.'
78 query = dns.message.make_query(name, 'A', 'IN')
79 expectedResponse = dns.message.make_response(query)
80 rrset = dns.rrset.from_text(name,
81 60,
82 dns.rdataclass.IN,
83 dns.rdatatype.A,
84 '127.0.0.1')
85 expectedResponse.answer.append(rrset)
86
87 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, expectedResponse)
645a1ca4 88 receivedQuery.id = query.id
7d2856a6
RG
89 self.assertEqual(query, receivedQuery)
90 self.assertEqual(receivedResponse, expectedResponse)
91 self.checkOnlyTLSResponderHit()
eaee76fe
RG
92 self.assertEqual(self.getServerStat('tcpNewConnections'), 1)
93 self.assertEqual(self.getServerStat('tcpReusedConnections'), 0)
94 self.assertEqual(self.getServerStat('tlsResumptions'), 0)
7d2856a6
RG
95
96class BrokenOutgoingTLSTests(object):
97
eaee76fe 98 _webTimeout = 2.0
630eb526 99 _webServerPort = pickAvailablePort()
eaee76fe
RG
100 _webServerBasicAuthPassword = 'secret'
101 _webServerAPIKey = 'apisecret'
7b8a394a
RG
102 _webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
103 _webServerAPIKeyHashed = '$scrypt$ln=10,p=1,r=8$9v8JxDfzQVyTpBkTbkUqYg==$bDQzAOHeK1G9UvTPypNhrX48w974ZXbFPtRKS34+aso='
eaee76fe 104
7d2856a6
RG
105 def checkNoResponderHit(self):
106 self.assertNotIn('UDP Responder', self._responsesCounter)
107 self.assertNotIn('TCP Responder', self._responsesCounter)
fd71df4e 108 self.assertNotIn('TLS Responder', self._responsesCounter)
7d2856a6
RG
109
110 def testUDP(self):
111 """
112 Outgoing TLS (broken): UDP query is sent via TLS
113 """
114 name = 'udp.broken-outgoing-tls.test.powerdns.com.'
115 query = dns.message.make_query(name, 'A', 'IN')
116
117 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
118 self.assertEqual(receivedResponse, None)
119 self.checkNoResponderHit()
120
121 def testTCP(self):
122 """
123 Outgoing TLS (broken): TCP query is sent via TLS
124 """
125 name = 'tcp.broken-outgoing-tls.test.powerdns.com.'
126 query = dns.message.make_query(name, 'A', 'IN')
127 expectedResponse = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 60,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '127.0.0.1')
133 expectedResponse.answer.append(rrset)
134
135 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
136 self.assertEqual(receivedResponse, None)
137 self.checkNoResponderHit()
138
139class TestOutgoingTLSOpenSSL(DNSDistTest, OutgoingTLSTests):
630eb526 140 _tlsBackendPort = pickAvailablePort()
7b8a394a 141 _config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
7d2856a6 142 _config_template = """
eaee76fe 143 setMaxTCPClientThreads(1)
7d2856a6 144 newServer{address="127.0.0.1:%s", tls='openssl', validateCertificates=true, caStore='ca.pem', subjectName='powerdns.com'}
eaee76fe
RG
145 webserver("127.0.0.1:%s")
146 setWebserverConfig({password="%s", apiKey="%s"})
7d2856a6
RG
147 """
148
bff62869
RG
149 @staticmethod
150 def sniCallback(sslSocket, sni, sslContext):
151 assert(sni == 'powerdns.com')
152 return None
153
7d2856a6
RG
154 @classmethod
155 def startResponders(cls):
156 tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
157 tlsContext.load_cert_chain('server.chain', 'server.key')
bff62869
RG
158 # requires Python 3.7+
159 if hasattr(tlsContext, 'sni_callback'):
160 tlsContext.sni_callback = cls.sniCallback
7d2856a6
RG
161
162 print("Launching TLS responder..")
163 cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
630eb526 164 cls._TLSResponder.daemon = True
7d2856a6
RG
165 cls._TLSResponder.start()
166
167class TestOutgoingTLSGnuTLS(DNSDistTest, OutgoingTLSTests):
630eb526 168 _tlsBackendPort = pickAvailablePort()
7b8a394a 169 _config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
7d2856a6 170 _config_template = """
eaee76fe 171 setMaxTCPClientThreads(1)
7d2856a6 172 newServer{address="127.0.0.1:%s", tls='gnutls', validateCertificates=true, caStore='ca.pem', subjectName='powerdns.com'}
eaee76fe
RG
173 webserver("127.0.0.1:%s")
174 setWebserverConfig({password="%s", apiKey="%s"})
7d2856a6
RG
175 """
176
177 @classmethod
178 def startResponders(cls):
179 tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
180 tlsContext.load_cert_chain('server.chain', 'server.key')
eaee76fe 181 tlsContext.keylog_filename = "/tmp/keys"
7d2856a6
RG
182
183 print("Launching TLS responder..")
184 cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
630eb526 185 cls._TLSResponder.daemon = True
7d2856a6
RG
186 cls._TLSResponder.start()
187
188class TestOutgoingTLSOpenSSLWrongCertName(DNSDistTest, BrokenOutgoingTLSTests):
630eb526 189 _tlsBackendPort = pickAvailablePort()
7b8a394a 190 _config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
7d2856a6 191 _config_template = """
eaee76fe 192 setMaxTCPClientThreads(1)
7d2856a6 193 newServer{address="127.0.0.1:%s", tls='openssl', validateCertificates=true, caStore='ca.pem', subjectName='not-powerdns.com'}
eaee76fe
RG
194 webserver("127.0.0.1:%s")
195 setWebserverConfig({password="%s", apiKey="%s"})
7d2856a6
RG
196 """
197
198 @classmethod
199 def startResponders(cls):
200 tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
201 tlsContext.load_cert_chain('server.chain', 'server.key')
202
203 print("Launching TLS responder..")
204 cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
630eb526 205 cls._TLSResponder.daemon = True
7d2856a6
RG
206 cls._TLSResponder.start()
207
208class TestOutgoingTLSGnuTLSWrongCertName(DNSDistTest, BrokenOutgoingTLSTests):
630eb526 209 _tlsBackendPort = pickAvailablePort()
7b8a394a 210 _config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
7d2856a6 211 _config_template = """
eaee76fe 212 setMaxTCPClientThreads(1)
7d2856a6 213 newServer{address="127.0.0.1:%s", tls='gnutls', validateCertificates=true, caStore='ca.pem', subjectName='not-powerdns.com'}
eaee76fe
RG
214 webserver("127.0.0.1:%s")
215 setWebserverConfig({password="%s", apiKey="%s"})
7d2856a6
RG
216 """
217
218 @classmethod
219 def startResponders(cls):
220 tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
221 tlsContext.load_cert_chain('server.chain', 'server.key')
222
223 print("Launching TLS responder..")
224 cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
630eb526 225 cls._TLSResponder.daemon = True
7d2856a6
RG
226 cls._TLSResponder.start()
227
228class TestOutgoingTLSOpenSSLWrongCertNameButNoCheck(DNSDistTest, OutgoingTLSTests):
630eb526 229 _tlsBackendPort = pickAvailablePort()
7b8a394a 230 _config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
7d2856a6 231 _config_template = """
eaee76fe 232 setMaxTCPClientThreads(1)
7d2856a6 233 newServer{address="127.0.0.1:%s", tls='openssl', validateCertificates=false, caStore='ca.pem', subjectName='not-powerdns.com'}
eaee76fe
RG
234 webserver("127.0.0.1:%s")
235 setWebserverConfig({password="%s", apiKey="%s"})
7d2856a6
RG
236 """
237
238 @classmethod
239 def startResponders(cls):
240 tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
241 tlsContext.load_cert_chain('server.chain', 'server.key')
242
243 print("Launching TLS responder..")
244 cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
630eb526 245 cls._TLSResponder.daemon = True
7d2856a6
RG
246 cls._TLSResponder.start()
247
248class TestOutgoingTLSGnuTLSWrongCertNameButNoCheck(DNSDistTest, OutgoingTLSTests):
630eb526 249 _tlsBackendPort = pickAvailablePort()
7b8a394a 250 _config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
7d2856a6 251 _config_template = """
eaee76fe 252 setMaxTCPClientThreads(1)
7d2856a6 253 newServer{address="127.0.0.1:%s", tls='gnutls', validateCertificates=false, caStore='ca.pem', subjectName='not-powerdns.com'}
eaee76fe
RG
254 webserver("127.0.0.1:%s")
255 setWebserverConfig({password="%s", apiKey="%s"})
7d2856a6
RG
256 """
257
258 @classmethod
259 def startResponders(cls):
260 tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
261 tlsContext.load_cert_chain('server.chain', 'server.key')
262
263 print("Launching TLS responder..")
264 cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
630eb526 265 cls._TLSResponder.daemon = True
7d2856a6 266 cls._TLSResponder.start()