]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_TLS.py
Merge pull request #13917 from Habbie/auth-4.9.0-docs-secpoll
[thirdparty/pdns.git] / regression-tests.dnsdist / test_TLS.py
CommitLineData
a227f47d 1#!/usr/bin/env python
1d896c34 2import base64
a227f47d 3import dns
5f4156be
RG
4import socket
5import ssl
1d896c34 6import subprocess
d4d57f56 7import time
5f4156be 8import unittest
630eb526 9from dnsdisttests import DNSDistTest, pickAvailablePort
a227f47d 10
5f4156be 11class TLSTests(object):
a227f47d 12
1d896c34
RG
13 def getServerCertificate(self):
14 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
75b536de
RG
15 cert = conn.getpeercert()
16 conn.close()
17 return cert
1d896c34 18
98222dfc
RG
19 def getTLSProvider(self):
20 return self.sendConsoleCommand("getBind(0):getEffectiveTLSProvider()").rstrip()
21
a227f47d
RG
22 def testTLSSimple(self):
23 """
24 TLS: Single query
25 """
26 name = 'single.tls.tests.powerdns.com.'
27 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
28 response = dns.message.make_response(query)
29 rrset = dns.rrset.from_text(name,
30 3600,
31 dns.rdataclass.IN,
32 dns.rdatatype.A,
33 '127.0.0.1')
34 response.answer.append(rrset)
35
36 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
37
38 self.sendTCPQueryOverConnection(conn, query, response=response)
39 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
40 self.assertTrue(receivedQuery)
41 self.assertTrue(receivedResponse)
42 receivedQuery.id = query.id
4bfebc93
CH
43 self.assertEqual(query, receivedQuery)
44 self.assertEqual(response, receivedResponse)
6db567d1 45
1d896c34
RG
46 # check the certificate
47 cert = self.getServerCertificate()
48 self.assertIn('subject', cert)
49 self.assertIn('serialNumber', cert)
50 self.assertIn('subjectAltName', cert)
51 subject = cert['subject']
52 altNames = cert['subjectAltName']
4bfebc93
CH
53 self.assertEqual(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
54 self.assertEqual(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
1d896c34
RG
55 names = []
56 for entry in altNames:
57 names.append(entry[1])
644eb242 58 self.assertEqual(names, ['tls.tests.dnsdist.org', 'powerdns.com', '127.0.0.1'])
1d896c34
RG
59 serialNumber = cert['serialNumber']
60
58ae5410 61 self.generateNewCertificateAndKey('server-tls')
1d896c34
RG
62 self.sendConsoleCommand("reloadAllCertificates()")
63
75b536de 64 conn.close()
1d896c34
RG
65 # open a new connection
66 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
67
68 self.sendTCPQueryOverConnection(conn, query, response=response)
69 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
70 self.assertTrue(receivedQuery)
71 self.assertTrue(receivedResponse)
72 receivedQuery.id = query.id
4bfebc93
CH
73 self.assertEqual(query, receivedQuery)
74 self.assertEqual(response, receivedResponse)
1d896c34
RG
75
76 # check that the certificate is OK
77 cert = self.getServerCertificate()
78 self.assertIn('subject', cert)
79 self.assertIn('serialNumber', cert)
80 self.assertIn('subjectAltName', cert)
81 subject = cert['subject']
82 altNames = cert['subjectAltName']
4bfebc93
CH
83 self.assertEqual(dict(subject[0])['commonName'], 'tls.tests.dnsdist.org')
84 self.assertEqual(dict(subject[1])['organizationalUnitName'], 'PowerDNS.com BV')
1d896c34
RG
85 names = []
86 for entry in altNames:
87 names.append(entry[1])
644eb242 88 self.assertEqual(names, ['tls.tests.dnsdist.org', 'powerdns.com', '127.0.0.1'])
1d896c34
RG
89
90 # and that the serial is different
4bfebc93 91 self.assertNotEqual(serialNumber, cert['serialNumber'])
75b536de 92 conn.close()
1d896c34 93
6db567d1
RG
94 def testTLKA(self):
95 """
96 TLS: Several queries over the same connection
97 """
98 name = 'ka.tls.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
100 response = dns.message.make_response(query)
101 rrset = dns.rrset.from_text(name,
102 3600,
103 dns.rdataclass.IN,
104 dns.rdatatype.A,
105 '127.0.0.1')
106 response.answer.append(rrset)
107
108 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
109
110 for idx in range(5):
111 self.sendTCPQueryOverConnection(conn, query, response=response)
112 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
113 self.assertTrue(receivedQuery)
114 self.assertTrue(receivedResponse)
115 receivedQuery.id = query.id
4bfebc93
CH
116 self.assertEqual(query, receivedQuery)
117 self.assertEqual(response, receivedResponse)
6db567d1 118
75b536de
RG
119 conn.close()
120
6db567d1
RG
121 def testTLSPipelining(self):
122 """
123 TLS: Several queries over the same connection without waiting for the responses
124 """
125 name = 'pipelining.tls.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
127 response = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 3600,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '127.0.0.1')
133 response.answer.append(rrset)
134
135 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
136
137 for idx in range(100):
138 self.sendTCPQueryOverConnection(conn, query, response=response)
139
140 for idx in range(100):
141 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
142 self.assertTrue(receivedQuery)
143 self.assertTrue(receivedResponse)
144 receivedQuery.id = query.id
4bfebc93
CH
145 self.assertEqual(query, receivedQuery)
146 self.assertEqual(response, receivedResponse)
046bac5c 147
75b536de
RG
148 conn.close()
149
046bac5c
RG
150 def testTLSSNIRouting(self):
151 """
152 TLS: SNI Routing
153 """
154 name = 'sni.tls.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
156 query.flags &= ~dns.flags.RD
157 response = dns.message.make_response(query)
158 rrset = dns.rrset.from_text(name,
159 3600,
160 dns.rdataclass.IN,
161 dns.rdatatype.A,
162 '127.0.0.1')
163 response.answer.append(rrset)
164 expectedResponse = dns.message.make_response(query)
165 rrset = dns.rrset.from_text(name,
166 3600,
167 dns.rdataclass.IN,
168 dns.rdatatype.A,
169 '1.2.3.4')
170 expectedResponse.answer.append(rrset)
171
172 # this SNI should match so we should get a spoofed answer
173 conn = self.openTLSConnection(self._tlsServerPort, 'powerdns.com', self._caCert)
174
175 self.sendTCPQueryOverConnection(conn, query, response=None)
176 receivedResponse = self.recvTCPResponseOverConnection(conn, useQueue=False)
177 self.assertTrue(receivedResponse)
4bfebc93 178 self.assertEqual(expectedResponse, receivedResponse)
046bac5c 179
75b536de 180 conn.close()
046bac5c
RG
181 # this one should not
182 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
183
184 self.sendTCPQueryOverConnection(conn, query, response=response)
185 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
186 self.assertTrue(receivedQuery)
187 self.assertTrue(receivedResponse)
188 receivedQuery.id = query.id
4bfebc93
CH
189 self.assertEqual(query, receivedQuery)
190 self.assertEqual(response, receivedResponse)
75b536de 191 conn.close()
d27309a9 192
5f4156be
RG
193 def testTLSSNIRoutingAfterResumption(self):
194 # we have more complicated tests about session resumption itself,
195 # but here we want to make sure the SNI is still present after resumption
196 """
197 TLS: SNI Routing after resumption
198 """
199 name = 'sni-resumed.tls.tests.powerdns.com.'
200 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
201 query.flags &= ~dns.flags.RD
202 response = dns.message.make_response(query)
203 rrset = dns.rrset.from_text(name,
204 3600,
205 dns.rdataclass.IN,
206 dns.rdatatype.A,
207 '127.0.0.1')
208 response.answer.append(rrset)
209 expectedResponse = dns.message.make_response(query)
210 rrset = dns.rrset.from_text(name,
211 3600,
212 dns.rdataclass.IN,
213 dns.rdatatype.A,
214 '1.2.3.4')
215 expectedResponse.answer.append(rrset)
216
217 # this SNI should match so we should get a spoofed answer
218 sslctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2)
219 sslctx.check_hostname = True
220 sslctx.verify_mode = ssl.CERT_REQUIRED
221 sslctx.load_verify_locations(self._caCert)
222
223 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
224 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
225 sock.settimeout(2.0)
226 sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com')
227 sslsock.connect(("127.0.0.1", self._tlsServerPort))
228
229 self.sendTCPQueryOverConnection(sslsock, query, response=None)
230 receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False)
231 self.assertTrue(receivedResponse)
4bfebc93 232 self.assertEqual(expectedResponse, receivedResponse)
5f4156be
RG
233 self.assertFalse(sslsock.session_reused)
234 session = sslsock.session
235
236 # this one should not (different SNI)
237 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
238 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
239 sock.settimeout(2.0)
240 sslsock = sslctx.wrap_socket(sock, server_hostname=self._serverName)
241 sslsock.connect(("127.0.0.1", self._tlsServerPort))
242
243 self.sendTCPQueryOverConnection(sslsock, query, response=response)
244 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(sslsock, useQueue=True)
245 self.assertTrue(receivedQuery)
246 self.assertTrue(receivedResponse)
247 receivedQuery.id = query.id
4bfebc93
CH
248 self.assertEqual(query, receivedQuery)
249 self.assertEqual(response, receivedResponse)
5f4156be
RG
250 self.assertFalse(sslsock.session_reused)
251
252 # and now we should be able to resume the session
253 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
254 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
255 sock.settimeout(2.0)
256 sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com')
257 sslsock.session = session
258 sslsock.connect(("127.0.0.1", self._tlsServerPort))
259
260 self.sendTCPQueryOverConnection(sslsock, query, response=None)
261 receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False)
262 self.assertTrue(receivedResponse)
4bfebc93 263 self.assertEqual(expectedResponse, receivedResponse)
5f4156be
RG
264 self.assertTrue(sslsock.session_reused)
265
266class TestOpenSSL(DNSDistTest, TLSTests):
267
b38aaeb5 268 _extraStartupSleep = 1
1d896c34
RG
269 _consoleKey = DNSDistTest.generateConsoleKey()
270 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
58ae5410
RG
271 _serverKey = 'server-tls.key'
272 _serverCert = 'server-tls.chain'
5f4156be
RG
273 _serverName = 'tls.tests.dnsdist.org'
274 _caCert = 'ca.pem'
630eb526 275 _tlsServerPort = pickAvailablePort()
5f4156be 276 _config_template = """
1d896c34
RG
277 setKey("%s")
278 controlSocket("127.0.0.1:%s")
279
5f4156be
RG
280 newServer{address="127.0.0.1:%s"}
281 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
282 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
283 """
1d896c34 284 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
5f4156be 285
58ae5410
RG
286 @classmethod
287 def setUpClass(cls):
288 cls.generateNewCertificateAndKey('server-tls')
289 cls.startResponders()
290 cls.startDNSDist()
291 cls.setUpSockets()
292
98222dfc 293 def testProvider(self):
630eb526 294 self.assertEqual(self.getTLSProvider(), "openssl")
98222dfc 295
5f4156be
RG
296class TestGnuTLS(DNSDistTest, TLSTests):
297
1d896c34
RG
298 _consoleKey = DNSDistTest.generateConsoleKey()
299 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
58ae5410
RG
300 _serverKey = 'server-tls.key'
301 _serverCert = 'server-tls.chain'
5f4156be
RG
302 _serverName = 'tls.tests.dnsdist.org'
303 _caCert = 'ca.pem'
630eb526 304 _tlsServerPort = pickAvailablePort()
5f4156be 305 _config_template = """
1d896c34
RG
306 setKey("%s")
307 controlSocket("127.0.0.1:%s")
308
5f4156be
RG
309 newServer{address="127.0.0.1:%s"}
310 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" })
311 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
312 """
1d896c34 313 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
5f4156be 314
58ae5410
RG
315 @classmethod
316 def setUpClass(cls):
317 cls.generateNewCertificateAndKey('server-tls')
318 cls.startResponders()
319 cls.startDNSDist()
320 cls.setUpSockets()
321
98222dfc 322 def testProvider(self):
630eb526 323 self.assertEqual(self.getTLSProvider(), "gnutls")
98222dfc 324
d27309a9 325class TestDOTWithCache(DNSDistTest):
d27309a9
RG
326 _serverKey = 'server.key'
327 _serverCert = 'server.chain'
328 _serverName = 'tls.tests.dnsdist.org'
329 _caCert = 'ca.pem'
630eb526 330 _tlsServerPort = pickAvailablePort()
d27309a9
RG
331 _config_template = """
332 newServer{address="127.0.0.1:%s"}
333
334 addTLSLocal("127.0.0.1:%s", "%s", "%s")
335
336 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
337 getPool(""):setCache(pc)
338 """
339 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
340
341 def testDOTCacheLargeAnswer(self):
342 """
343 DOT with cache: Check that we can cache (and retrieve) large answers
344 """
345 numberOfQueries = 10
346 name = 'large.dot-with-cache.tests.powerdns.com.'
347 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
348 query.id = 0
349 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
350 expectedQuery.id = 0
351 response = dns.message.make_response(query)
352 # we prepare a large answer
353 content = ""
354 for i in range(44):
355 if len(content) > 0:
356 content = content + ', '
357 content = content + (str(i)*50)
358 # pad up to 4096
359 content = content + 'A'*40
360
361 rrset = dns.rrset.from_text(name,
362 3600,
363 dns.rdataclass.IN,
364 dns.rdatatype.TXT,
365 content)
366 response.answer.append(rrset)
4bfebc93 367 self.assertEqual(len(response.to_wire()), 4096)
d27309a9
RG
368
369 # first query to fill the cache
370 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
371 self.sendTCPQueryOverConnection(conn, query, response=response)
372 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
373
374 self.assertTrue(receivedQuery)
375 self.assertTrue(receivedResponse)
376 receivedQuery.id = expectedQuery.id
4bfebc93 377 self.assertEqual(expectedQuery, receivedQuery)
d27309a9 378 self.checkQueryNoEDNS(expectedQuery, receivedQuery)
4bfebc93 379 self.assertEqual(response, receivedResponse)
75b536de 380 conn.close()
d27309a9
RG
381
382 for _ in range(numberOfQueries):
383 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
384 self.sendTCPQueryOverConnection(conn, query, response=None)
385 receivedResponse = self.recvTCPResponseOverConnection(conn, useQueue=False)
4bfebc93 386 self.assertEqual(receivedResponse, response)
75b536de 387 conn.close()
d4d57f56
RG
388
389class TestTLSFrontendLimits(DNSDistTest):
390
391 # this test suite uses a different responder port
392 # because it uses a different health check configuration
630eb526 393 _testServerPort = pickAvailablePort()
d4d57f56
RG
394 _answerUnexpected = True
395
396 _serverKey = 'server.key'
397 _serverCert = 'server.chain'
398 _serverName = 'tls.tests.dnsdist.org'
399 _caCert = 'ca.pem'
630eb526 400 _tlsServerPort = pickAvailablePort()
d4d57f56
RG
401
402 _skipListeningOnCL = True
403 _tcpIdleTimeout = 2
404 _maxTCPConnsPerTLSFrontend = 5
405 _config_template = """
406 newServer{address="127.0.0.1:%s"}
407 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", maxConcurrentTCPConnections=%d })
408 """
409 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerTLSFrontend']
1953ab6c
RG
410 _alternateListeningAddr = '127.0.0.1'
411 _alternateListeningPort = _tlsServerPort
d4d57f56
RG
412
413 def testTCPConnsPerTLSFrontend(self):
414 """
415 TLS Frontend Limits: Maximum number of conns per TLS frontend
416 """
417 name = 'maxconnspertlsfrontend.tls.tests.powerdns.com.'
418 query = dns.message.make_query(name, 'A', 'IN')
419 conns = []
420
421 for idx in range(self._maxTCPConnsPerTLSFrontend + 1):
422 try:
423 conns.append(self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert))
424 except:
425 conns.append(None)
426
427 count = 0
428 failed = 0
429 for conn in conns:
430 if not conn:
431 failed = failed + 1
432 continue
433
434 try:
435 self.sendTCPQueryOverConnection(conn, query)
436 response = self.recvTCPResponseOverConnection(conn)
437 if response:
438 count = count + 1
439 else:
440 failed = failed + 1
441 except:
442 failed = failed + 1
443
444 for conn in conns:
445 if conn:
446 conn.close()
447
448 # wait a bit to be sure that dnsdist closed the connections
449 # and decremented the counters on its side, otherwise subsequent
450 # connections will be dropped
451 time.sleep(1)
452
453 self.assertEqual(count, self._maxTCPConnsPerTLSFrontend)
454 self.assertEqual(failed, 1)
7d808ff4
RG
455
456class TestProtocols(DNSDistTest):
457 _serverKey = 'server.key'
458 _serverCert = 'server.chain'
459 _serverName = 'tls.tests.dnsdist.org'
460 _caCert = 'ca.pem'
630eb526 461 _tlsServerPort = pickAvailablePort()
7d808ff4
RG
462
463 _config_template = """
464 function checkDOT(dq)
465 if dq:getProtocol() ~= "DNS over TLS" then
466 return DNSAction.Spoof, '1.2.3.4'
467 end
468 return DNSAction.None
469 end
470
471 addAction("protocols.tls.tests.powerdns.com.", LuaAction(checkDOT))
472 newServer{address="127.0.0.1:%s"}
473 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
474 """
475 _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
476
477 def testProtocolDOT(self):
478 """
479 DoT: Test DNSQuestion.Protocol
480 """
481 name = 'protocols.tls.tests.powerdns.com.'
482 query = dns.message.make_query(name, 'A', 'IN')
483 response = dns.message.make_response(query)
484
485 conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
486 self.sendTCPQueryOverConnection(conn, query, response=response)
487 (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
488 self.assertTrue(receivedQuery)
489 self.assertTrue(receivedResponse)
490 receivedQuery.id = query.id
491 self.assertEqual(query, receivedQuery)
492 self.assertEqual(response, receivedResponse)
75b536de 493 conn.close()
5ac11505
CHB
494
495class TestPKCSTLSCertificate(DNSDistTest, TLSTests):
496 _consoleKey = DNSDistTest.generateConsoleKey()
497 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
58ae5410 498 _serverCert = 'server-tls.p12'
5ac11505
CHB
499 _pkcsPassphrase = 'passw0rd'
500 _serverName = 'tls.tests.dnsdist.org'
501 _caCert = 'ca.pem'
630eb526 502 _tlsServerPort = pickAvailablePort()
5ac11505
CHB
503 _config_template = """
504 setKey("%s")
505 controlSocket("127.0.0.1:%s")
506 newServer{address="127.0.0.1:%s"}
507 cert=newTLSCertificate("%s", {password="%s"})
508 addTLSLocal("127.0.0.1:%s", cert, "", { provider="openssl" })
509 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
510 """
511 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_serverCert', '_pkcsPassphrase', '_tlsServerPort']
58ae5410
RG
512
513 @classmethod
514 def setUpClass(cls):
515 cls.generateNewCertificateAndKey('server-tls')
516 cls.startResponders()
517 cls.startDNSDist()
518 cls.setUpSockets()