]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_TLS.py
Merge pull request #13612 from omoerbeek/rec-13588-followup
[thirdparty/pdns.git] / regression-tests.dnsdist / test_TLS.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import socket
5 import ssl
6 import subprocess
7 import time
8 import unittest
9 from dnsdisttests import DNSDistTest, pickAvailablePort
10
11 class TLSTests(object):
12
13 def getServerCertificate(self):
14 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
15 cert = conn.getpeercert()
16 conn.close()
17 return cert
18
19 def getTLSProvider(self):
20 return self.sendConsoleCommand("getBind(0):getEffectiveTLSProvider()").rstrip()
21
22 def testTLSSimple(self):
23 """
24 TLS: Single query
25 """
26 name = 'single.tls.tests.powerdns.com.'
27 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
28 response = dns.message.make_response(query)
29 rrset = dns.rrset.from_text(name,
30 3600,
31 dns.rdataclass.IN,
32 dns.rdatatype.A,
33 '127.0.0.1')
34 response.answer.append(rrset)
35
36 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
37
38 self.sendTCPQueryOverConnection(conn, query, response=response)
39 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
40 self.assertTrue(receivedQuery)
41 self.assertTrue(receivedResponse)
42 receivedQuery.id = query.id
43 self.assertEqual(query, receivedQuery)
44 self.assertEqual(response, receivedResponse)
45
46 # check the certificate
47 cert = self.getServerCertificate()
48 self.assertIn('subject', cert)
49 self.assertIn('serialNumber', cert)
50 self.assertIn('subjectAltName', cert)
51 subject = cert['subject']
52 altNames = cert['subjectAltName']
53 self.assertEqual(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
54 self.assertEqual(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
55 names = []
56 for entry in altNames:
57 names.append(entry[1])
58 self.assertEqual(names, ['tls.tests.dnsdist.org', 'powerdns.com', '127.0.0.1'])
59 serialNumber = cert['serialNumber']
60
61 self.generateNewCertificateAndKey('server-tls')
62 self.sendConsoleCommand("reloadAllCertificates()")
63
64 conn.close()
65 # open a new connection
66 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
67
68 self.sendTCPQueryOverConnection(conn, query, response=response)
69 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
70 self.assertTrue(receivedQuery)
71 self.assertTrue(receivedResponse)
72 receivedQuery.id = query.id
73 self.assertEqual(query, receivedQuery)
74 self.assertEqual(response, receivedResponse)
75
76 # check that the certificate is OK
77 cert = self.getServerCertificate()
78 self.assertIn('subject', cert)
79 self.assertIn('serialNumber', cert)
80 self.assertIn('subjectAltName', cert)
81 subject = cert['subject']
82 altNames = cert['subjectAltName']
83 self.assertEqual(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
84 self.assertEqual(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
85 names = []
86 for entry in altNames:
87 names.append(entry[1])
88 self.assertEqual(names, ['tls.tests.dnsdist.org', 'powerdns.com', '127.0.0.1'])
89
90 # and that the serial is different
91 self.assertNotEqual(serialNumber, cert['serialNumber'])
92 conn.close()
93
94 def testTLKA(self):
95 """
96 TLS: Several queries over the same connection
97 """
98 name = 'ka.tls.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
100 response = dns.message.make_response(query)
101 rrset = dns.rrset.from_text(name,
102 3600,
103 dns.rdataclass.IN,
104 dns.rdatatype.A,
105 '127.0.0.1')
106 response.answer.append(rrset)
107
108 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
109
110 for idx in range(5):
111 self.sendTCPQueryOverConnection(conn, query, response=response)
112 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
113 self.assertTrue(receivedQuery)
114 self.assertTrue(receivedResponse)
115 receivedQuery.id = query.id
116 self.assertEqual(query, receivedQuery)
117 self.assertEqual(response, receivedResponse)
118
119 conn.close()
120
121 def testTLSPipelining(self):
122 """
123 TLS: Several queries over the same connection without waiting for the responses
124 """
125 name = 'pipelining.tls.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
127 response = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 3600,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '127.0.0.1')
133 response.answer.append(rrset)
134
135 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
136
137 for idx in range(100):
138 self.sendTCPQueryOverConnection(conn, query, response=response)
139
140 for idx in range(100):
141 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
142 self.assertTrue(receivedQuery)
143 self.assertTrue(receivedResponse)
144 receivedQuery.id = query.id
145 self.assertEqual(query, receivedQuery)
146 self.assertEqual(response, receivedResponse)
147
148 conn.close()
149
150 def testTLSSNIRouting(self):
151 """
152 TLS: SNI Routing
153 """
154 name = 'sni.tls.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
156 query.flags &= ~dns.flags.RD
157 response = dns.message.make_response(query)
158 rrset = dns.rrset.from_text(name,
159 3600,
160 dns.rdataclass.IN,
161 dns.rdatatype.A,
162 '127.0.0.1')
163 response.answer.append(rrset)
164 expectedResponse = dns.message.make_response(query)
165 rrset = dns.rrset.from_text(name,
166 3600,
167 dns.rdataclass.IN,
168 dns.rdatatype.A,
169 '1.2.3.4')
170 expectedResponse.answer.append(rrset)
171
172 # this SNI should match so we should get a spoofed answer
173 conn = self.openTLSConnection(self._tlsServerPort, 'powerdns.com', self._caCert)
174
175 self.sendTCPQueryOverConnection(conn, query, response=None)
176 receivedResponse = self.recvTCPResponseOverConnection(conn, useQueue=False)
177 self.assertTrue(receivedResponse)
178 self.assertEqual(expectedResponse, receivedResponse)
179
180 conn.close()
181 # this one should not
182 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
183
184 self.sendTCPQueryOverConnection(conn, query, response=response)
185 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
186 self.assertTrue(receivedQuery)
187 self.assertTrue(receivedResponse)
188 receivedQuery.id = query.id
189 self.assertEqual(query, receivedQuery)
190 self.assertEqual(response, receivedResponse)
191 conn.close()
192
193 def testTLSSNIRoutingAfterResumption(self):
194 # we have more complicated tests about session resumption itself,
195 # but here we want to make sure the SNI is still present after resumption
196 """
197 TLS: SNI Routing after resumption
198 """
199 name = 'sni-resumed.tls.tests.powerdns.com.'
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
201 query.flags &= ~dns.flags.RD
202 response = dns.message.make_response(query)
203 rrset = dns.rrset.from_text(name,
204 3600,
205 dns.rdataclass.IN,
206 dns.rdatatype.A,
207 '127.0.0.1')
208 response.answer.append(rrset)
209 expectedResponse = dns.message.make_response(query)
210 rrset = dns.rrset.from_text(name,
211 3600,
212 dns.rdataclass.IN,
213 dns.rdatatype.A,
214 '1.2.3.4')
215 expectedResponse.answer.append(rrset)
216
217 # this SNI should match so we should get a spoofed answer
218 sslctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2)
219 sslctx.check_hostname = True
220 sslctx.verify_mode = ssl.CERT_REQUIRED
221 sslctx.load_verify_locations(self._caCert)
222
223 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
224 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
225 sock.settimeout(2.0)
226 sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com')
227 sslsock.connect(("127.0.0.1", self._tlsServerPort))
228
229 self.sendTCPQueryOverConnection(sslsock, query, response=None)
230 receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False)
231 self.assertTrue(receivedResponse)
232 self.assertEqual(expectedResponse, receivedResponse)
233 self.assertFalse(sslsock.session_reused)
234 session = sslsock.session
235
236 # this one should not (different SNI)
237 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
239 sock.settimeout(2.0)
240 sslsock = sslctx.wrap_socket(sock, server_hostname=self._serverName)
241 sslsock.connect(("127.0.0.1", self._tlsServerPort))
242
243 self.sendTCPQueryOverConnection(sslsock, query, response=response)
244 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(sslsock, useQueue=True)
245 self.assertTrue(receivedQuery)
246 self.assertTrue(receivedResponse)
247 receivedQuery.id = query.id
248 self.assertEqual(query, receivedQuery)
249 self.assertEqual(response, receivedResponse)
250 self.assertFalse(sslsock.session_reused)
251
252 # and now we should be able to resume the session
253 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
254 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
255 sock.settimeout(2.0)
256 sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com')
257 sslsock.session = session
258 sslsock.connect(("127.0.0.1", self._tlsServerPort))
259
260 self.sendTCPQueryOverConnection(sslsock, query, response=None)
261 receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False)
262 self.assertTrue(receivedResponse)
263 self.assertEqual(expectedResponse, receivedResponse)
264 self.assertTrue(sslsock.session_reused)
265
266 class TestOpenSSL(DNSDistTest, TLSTests):
267
268 _extraStartupSleep = 1
269 _consoleKey = DNSDistTest.generateConsoleKey()
270 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
271 _serverKey = 'server-tls.key'
272 _serverCert = 'server-tls.chain'
273 _serverName = 'tls.tests.dnsdist.org'
274 _caCert = 'ca.pem'
275 _tlsServerPort = pickAvailablePort()
276 _config_template = """
277 setKey("%s")
278 controlSocket("127.0.0.1:%s")
279
280 newServer{address="127.0.0.1:%s"}
281 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
282 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
283 """
284 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
285
286 @classmethod
287 def setUpClass(cls):
288 cls.generateNewCertificateAndKey('server-tls')
289 cls.startResponders()
290 cls.startDNSDist()
291 cls.setUpSockets()
292
293 def testProvider(self):
294 self.assertEqual(self.getTLSProvider(), "openssl")
295
296 class TestGnuTLS(DNSDistTest, TLSTests):
297
298 _consoleKey = DNSDistTest.generateConsoleKey()
299 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
300 _serverKey = 'server-tls.key'
301 _serverCert = 'server-tls.chain'
302 _serverName = 'tls.tests.dnsdist.org'
303 _caCert = 'ca.pem'
304 _tlsServerPort = pickAvailablePort()
305 _config_template = """
306 setKey("%s")
307 controlSocket("127.0.0.1:%s")
308
309 newServer{address="127.0.0.1:%s"}
310 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" })
311 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
312 """
313 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
314
315 @classmethod
316 def setUpClass(cls):
317 cls.generateNewCertificateAndKey('server-tls')
318 cls.startResponders()
319 cls.startDNSDist()
320 cls.setUpSockets()
321
322 def testProvider(self):
323 self.assertEqual(self.getTLSProvider(), "gnutls")
324
325 class TestDOTWithCache(DNSDistTest):
326 _serverKey = 'server.key'
327 _serverCert = 'server.chain'
328 _serverName = 'tls.tests.dnsdist.org'
329 _caCert = 'ca.pem'
330 _tlsServerPort = pickAvailablePort()
331 _config_template = """
332 newServer{address="127.0.0.1:%s"}
333
334 addTLSLocal("127.0.0.1:%s", "%s", "%s")
335
336 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
337 getPool(""):setCache(pc)
338 """
339 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
340
341 def testDOTCacheLargeAnswer(self):
342 """
343 DOT with cache: Check that we can cache (and retrieve) large answers
344 """
345 numberOfQueries = 10
346 name = 'large.dot-with-cache.tests.powerdns.com.'
347 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
348 query.id = 0
349 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
350 expectedQuery.id = 0
351 response = dns.message.make_response(query)
352 # we prepare a large answer
353 content = ""
354 for i in range(44):
355 if len(content) > 0:
356 content = content + ', '
357 content = content + (str(i)*50)
358 # pad up to 4096
359 content = content + 'A'*40
360
361 rrset = dns.rrset.from_text(name,
362 3600,
363 dns.rdataclass.IN,
364 dns.rdatatype.TXT,
365 content)
366 response.answer.append(rrset)
367 self.assertEqual(len(response.to_wire()), 4096)
368
369 # first query to fill the cache
370 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
371 self.sendTCPQueryOverConnection(conn, query, response=response)
372 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
373
374 self.assertTrue(receivedQuery)
375 self.assertTrue(receivedResponse)
376 receivedQuery.id = expectedQuery.id
377 self.assertEqual(expectedQuery, receivedQuery)
378 self.checkQueryNoEDNS(expectedQuery, receivedQuery)
379 self.assertEqual(response, receivedResponse)
380 conn.close()
381
382 for _ in range(numberOfQueries):
383 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
384 self.sendTCPQueryOverConnection(conn, query, response=None)
385 receivedResponse = self.recvTCPResponseOverConnection(conn, useQueue=False)
386 self.assertEqual(receivedResponse, response)
387 conn.close()
388
389 class TestTLSFrontendLimits(DNSDistTest):
390
391 # this test suite uses a different responder port
392 # because it uses a different health check configuration
393 _testServerPort = pickAvailablePort()
394 _answerUnexpected = True
395
396 _serverKey = 'server.key'
397 _serverCert = 'server.chain'
398 _serverName = 'tls.tests.dnsdist.org'
399 _caCert = 'ca.pem'
400 _tlsServerPort = pickAvailablePort()
401
402 _skipListeningOnCL = True
403 _tcpIdleTimeout = 2
404 _maxTCPConnsPerTLSFrontend = 5
405 _config_template = """
406 newServer{address="127.0.0.1:%s"}
407 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", maxConcurrentTCPConnections=%d })
408 """
409 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerTLSFrontend']
410 _alternateListeningAddr = '127.0.0.1'
411 _alternateListeningPort = _tlsServerPort
412
413 def testTCPConnsPerTLSFrontend(self):
414 """
415 TLS Frontend Limits: Maximum number of conns per TLS frontend
416 """
417 name = 'maxconnspertlsfrontend.tls.tests.powerdns.com.'
418 query = dns.message.make_query(name, 'A', 'IN')
419 conns = []
420
421 for idx in range(self._maxTCPConnsPerTLSFrontend + 1):
422 try:
423 conns.append(self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert))
424 except:
425 conns.append(None)
426
427 count = 0
428 failed = 0
429 for conn in conns:
430 if not conn:
431 failed = failed + 1
432 continue
433
434 try:
435 self.sendTCPQueryOverConnection(conn, query)
436 response = self.recvTCPResponseOverConnection(conn)
437 if response:
438 count = count + 1
439 else:
440 failed = failed + 1
441 except:
442 failed = failed + 1
443
444 for conn in conns:
445 if conn:
446 conn.close()
447
448 # wait a bit to be sure that dnsdist closed the connections
449 # and decremented the counters on its side, otherwise subsequent
450 # connections will be dropped
451 time.sleep(1)
452
453 self.assertEqual(count, self._maxTCPConnsPerTLSFrontend)
454 self.assertEqual(failed, 1)
455
456 class TestProtocols(DNSDistTest):
457 _serverKey = 'server.key'
458 _serverCert = 'server.chain'
459 _serverName = 'tls.tests.dnsdist.org'
460 _caCert = 'ca.pem'
461 _tlsServerPort = pickAvailablePort()
462
463 _config_template = """
464 function checkDOT(dq)
465 if dq:getProtocol() ~= "DNS over TLS" then
466 return DNSAction.Spoof, '1.2.3.4'
467 end
468 return DNSAction.None
469 end
470
471 addAction("protocols.tls.tests.powerdns.com.", LuaAction(checkDOT))
472 newServer{address="127.0.0.1:%s"}
473 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
474 """
475 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
476
477 def testProtocolDOT(self):
478 """
479 DoT: Test DNSQuestion.Protocol
480 """
481 name = 'protocols.tls.tests.powerdns.com.'
482 query = dns.message.make_query(name, 'A', 'IN')
483 response = dns.message.make_response(query)
484
485 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
486 self.sendTCPQueryOverConnection(conn, query, response=response)
487 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
488 self.assertTrue(receivedQuery)
489 self.assertTrue(receivedResponse)
490 receivedQuery.id = query.id
491 self.assertEqual(query, receivedQuery)
492 self.assertEqual(response, receivedResponse)
493 conn.close()
494
495 class TestPKCSTLSCertificate(DNSDistTest, TLSTests):
496 _consoleKey = DNSDistTest.generateConsoleKey()
497 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
498 _serverCert = 'server-tls.p12'
499 _pkcsPassphrase = 'passw0rd'
500 _serverName = 'tls.tests.dnsdist.org'
501 _caCert = 'ca.pem'
502 _tlsServerPort = pickAvailablePort()
503 _config_template = """
504 setKey("%s")
505 controlSocket("127.0.0.1:%s")
506 newServer{address="127.0.0.1:%s"}
507 cert=newTLSCertificate("%s", {password="%s"})
508 addTLSLocal("127.0.0.1:%s", cert, "", { provider="openssl" })
509 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
510 """
511 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_serverCert', '_pkcsPassphrase', '_tlsServerPort']
512
513 @classmethod
514 def setUpClass(cls):
515 cls.generateNewCertificateAndKey('server-tls')
516 cls.startResponders()
517 cls.startDNSDist()
518 cls.setUpSockets()