9 from dnsdisttests
import DNSDistTest
, pickAvailablePort
11 class TLSTests(object):
13 def getServerCertificate(self
):
14 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
15 cert
= conn
.getpeercert()
19 def getTLSProvider(self
):
20 return self
.sendConsoleCommand("getBind(0):getEffectiveTLSProvider()").rstrip()
22 def testTLSSimple(self
):
26 name
= 'single.tls.tests.powerdns.com.'
27 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
28 response
= dns
.message
.make_response(query
)
29 rrset
= dns
.rrset
.from_text(name
,
34 response
.answer
.append(rrset
)
36 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
38 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
39 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
40 self
.assertTrue(receivedQuery
)
41 self
.assertTrue(receivedResponse
)
42 receivedQuery
.id = query
.id
43 self
.assertEqual(query
, receivedQuery
)
44 self
.assertEqual(response
, receivedResponse
)
46 # check the certificate
47 cert
= self
.getServerCertificate()
48 self
.assertIn('subject', cert
)
49 self
.assertIn('serialNumber', cert
)
50 self
.assertIn('subjectAltName', cert
)
51 subject
= cert
['subject']
52 altNames
= cert
['subjectAltName']
53 self
.assertEqual(dict(subject
[0])['commonName'], 'tls.tests.dnsdist.org')
54 self
.assertEqual(dict(subject
[1])['organizationalUnitName'], 'PowerDNS.com BV')
56 for entry
in altNames
:
57 names
.append(entry
[1])
58 self
.assertEqual(names
, ['tls.tests.dnsdist.org', 'powerdns.com', '127.0.0.1'])
59 serialNumber
= cert
['serialNumber']
61 self
.generateNewCertificateAndKey('server-tls')
62 self
.sendConsoleCommand("reloadAllCertificates()")
65 # open a new connection
66 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
68 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
69 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
70 self
.assertTrue(receivedQuery
)
71 self
.assertTrue(receivedResponse
)
72 receivedQuery
.id = query
.id
73 self
.assertEqual(query
, receivedQuery
)
74 self
.assertEqual(response
, receivedResponse
)
76 # check that the certificate is OK
77 cert
= self
.getServerCertificate()
78 self
.assertIn('subject', cert
)
79 self
.assertIn('serialNumber', cert
)
80 self
.assertIn('subjectAltName', cert
)
81 subject
= cert
['subject']
82 altNames
= cert
['subjectAltName']
83 self
.assertEqual(dict(subject
[0])['commonName'], 'tls.tests.dnsdist.org')
84 self
.assertEqual(dict(subject
[1])['organizationalUnitName'], 'PowerDNS.com BV')
86 for entry
in altNames
:
87 names
.append(entry
[1])
88 self
.assertEqual(names
, ['tls.tests.dnsdist.org', 'powerdns.com', '127.0.0.1'])
90 # and that the serial is different
91 self
.assertNotEqual(serialNumber
, cert
['serialNumber'])
96 TLS: Several queries over the same connection
98 name
= 'ka.tls.tests.powerdns.com.'
99 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
100 response
= dns
.message
.make_response(query
)
101 rrset
= dns
.rrset
.from_text(name
,
106 response
.answer
.append(rrset
)
108 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
111 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
112 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
113 self
.assertTrue(receivedQuery
)
114 self
.assertTrue(receivedResponse
)
115 receivedQuery
.id = query
.id
116 self
.assertEqual(query
, receivedQuery
)
117 self
.assertEqual(response
, receivedResponse
)
121 def testTLSPipelining(self
):
123 TLS: Several queries over the same connection without waiting for the responses
125 name
= 'pipelining.tls.tests.powerdns.com.'
126 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
127 response
= dns
.message
.make_response(query
)
128 rrset
= dns
.rrset
.from_text(name
,
133 response
.answer
.append(rrset
)
135 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
137 for idx
in range(100):
138 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
140 for idx
in range(100):
141 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
142 self
.assertTrue(receivedQuery
)
143 self
.assertTrue(receivedResponse
)
144 receivedQuery
.id = query
.id
145 self
.assertEqual(query
, receivedQuery
)
146 self
.assertEqual(response
, receivedResponse
)
150 def testTLSSNIRouting(self
):
154 name
= 'sni.tls.tests.powerdns.com.'
155 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
156 query
.flags
&= ~dns
.flags
.RD
157 response
= dns
.message
.make_response(query
)
158 rrset
= dns
.rrset
.from_text(name
,
163 response
.answer
.append(rrset
)
164 expectedResponse
= dns
.message
.make_response(query
)
165 rrset
= dns
.rrset
.from_text(name
,
170 expectedResponse
.answer
.append(rrset
)
172 # this SNI should match so we should get a spoofed answer
173 conn
= self
.openTLSConnection(self
._tlsServerPort
, 'powerdns.com', self
._caCert
)
175 self
.sendTCPQueryOverConnection(conn
, query
, response
=None)
176 receivedResponse
= self
.recvTCPResponseOverConnection(conn
, useQueue
=False)
177 self
.assertTrue(receivedResponse
)
178 self
.assertEqual(expectedResponse
, receivedResponse
)
181 # this one should not
182 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
184 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
185 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
186 self
.assertTrue(receivedQuery
)
187 self
.assertTrue(receivedResponse
)
188 receivedQuery
.id = query
.id
189 self
.assertEqual(query
, receivedQuery
)
190 self
.assertEqual(response
, receivedResponse
)
193 def testTLSSNIRoutingAfterResumption(self
):
194 # we have more complicated tests about session resumption itself,
195 # but here we want to make sure the SNI is still present after resumption
197 TLS: SNI Routing after resumption
199 name
= 'sni-resumed.tls.tests.powerdns.com.'
200 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
201 query
.flags
&= ~dns
.flags
.RD
202 response
= dns
.message
.make_response(query
)
203 rrset
= dns
.rrset
.from_text(name
,
208 response
.answer
.append(rrset
)
209 expectedResponse
= dns
.message
.make_response(query
)
210 rrset
= dns
.rrset
.from_text(name
,
215 expectedResponse
.answer
.append(rrset
)
217 # this SNI should match so we should get a spoofed answer
218 sslctx
= ssl
.SSLContext(protocol
=ssl
.PROTOCOL_TLSv1_2
)
219 sslctx
.check_hostname
= True
220 sslctx
.verify_mode
= ssl
.CERT_REQUIRED
221 sslctx
.load_verify_locations(self
._caCert
)
223 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
224 sock
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
226 sslsock
= sslctx
.wrap_socket(sock
, server_hostname
='powerdns.com')
227 sslsock
.connect(("127.0.0.1", self
._tlsServerPort
))
229 self
.sendTCPQueryOverConnection(sslsock
, query
, response
=None)
230 receivedResponse
= self
.recvTCPResponseOverConnection(sslsock
, useQueue
=False)
231 self
.assertTrue(receivedResponse
)
232 self
.assertEqual(expectedResponse
, receivedResponse
)
233 self
.assertFalse(sslsock
.session_reused
)
234 session
= sslsock
.session
236 # this one should not (different SNI)
237 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
238 sock
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
240 sslsock
= sslctx
.wrap_socket(sock
, server_hostname
=self
._serverName
)
241 sslsock
.connect(("127.0.0.1", self
._tlsServerPort
))
243 self
.sendTCPQueryOverConnection(sslsock
, query
, response
=response
)
244 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(sslsock
, useQueue
=True)
245 self
.assertTrue(receivedQuery
)
246 self
.assertTrue(receivedResponse
)
247 receivedQuery
.id = query
.id
248 self
.assertEqual(query
, receivedQuery
)
249 self
.assertEqual(response
, receivedResponse
)
250 self
.assertFalse(sslsock
.session_reused
)
252 # and now we should be able to resume the session
253 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
254 sock
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
256 sslsock
= sslctx
.wrap_socket(sock
, server_hostname
='powerdns.com')
257 sslsock
.session
= session
258 sslsock
.connect(("127.0.0.1", self
._tlsServerPort
))
260 self
.sendTCPQueryOverConnection(sslsock
, query
, response
=None)
261 receivedResponse
= self
.recvTCPResponseOverConnection(sslsock
, useQueue
=False)
262 self
.assertTrue(receivedResponse
)
263 self
.assertEqual(expectedResponse
, receivedResponse
)
264 self
.assertTrue(sslsock
.session_reused
)
266 class TestOpenSSL(DNSDistTest
, TLSTests
):
268 _extraStartupSleep
= 1
269 _consoleKey
= DNSDistTest
.generateConsoleKey()
270 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
271 _serverKey
= 'server-tls.key'
272 _serverCert
= 'server-tls.chain'
273 _serverName
= 'tls.tests.dnsdist.org'
275 _tlsServerPort
= pickAvailablePort()
276 _config_template
= """
278 controlSocket("127.0.0.1:%s")
280 newServer{address="127.0.0.1:%s"}
281 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
282 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
284 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
288 cls
.generateNewCertificateAndKey('server-tls')
289 cls
.startResponders()
293 def testProvider(self
):
294 self
.assertEqual(self
.getTLSProvider(), "openssl")
296 class TestGnuTLS(DNSDistTest
, TLSTests
):
298 _consoleKey
= DNSDistTest
.generateConsoleKey()
299 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
300 _serverKey
= 'server-tls.key'
301 _serverCert
= 'server-tls.chain'
302 _serverName
= 'tls.tests.dnsdist.org'
304 _tlsServerPort
= pickAvailablePort()
305 _config_template
= """
307 controlSocket("127.0.0.1:%s")
309 newServer{address="127.0.0.1:%s"}
310 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" })
311 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
313 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
317 cls
.generateNewCertificateAndKey('server-tls')
318 cls
.startResponders()
322 def testProvider(self
):
323 self
.assertEqual(self
.getTLSProvider(), "gnutls")
325 class TestDOTWithCache(DNSDistTest
):
326 _serverKey
= 'server.key'
327 _serverCert
= 'server.chain'
328 _serverName
= 'tls.tests.dnsdist.org'
330 _tlsServerPort
= pickAvailablePort()
331 _config_template
= """
332 newServer{address="127.0.0.1:%s"}
334 addTLSLocal("127.0.0.1:%s", "%s", "%s")
336 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
337 getPool(""):setCache(pc)
339 _config_params
= ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
341 def testDOTCacheLargeAnswer(self
):
343 DOT with cache: Check that we can cache (and retrieve) large answers
346 name
= 'large.dot-with-cache.tests.powerdns.com.'
347 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
349 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
351 response
= dns
.message
.make_response(query
)
352 # we prepare a large answer
356 content
= content
+ ', '
357 content
= content
+ (str(i
)*50)
359 content
= content
+ 'A'*40
361 rrset
= dns
.rrset
.from_text(name
,
366 response
.answer
.append(rrset
)
367 self
.assertEqual(len(response
.to_wire()), 4096)
369 # first query to fill the cache
370 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
371 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
372 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
374 self
.assertTrue(receivedQuery
)
375 self
.assertTrue(receivedResponse
)
376 receivedQuery
.id = expectedQuery
.id
377 self
.assertEqual(expectedQuery
, receivedQuery
)
378 self
.checkQueryNoEDNS(expectedQuery
, receivedQuery
)
379 self
.assertEqual(response
, receivedResponse
)
382 for _
in range(numberOfQueries
):
383 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
384 self
.sendTCPQueryOverConnection(conn
, query
, response
=None)
385 receivedResponse
= self
.recvTCPResponseOverConnection(conn
, useQueue
=False)
386 self
.assertEqual(receivedResponse
, response
)
389 class TestTLSFrontendLimits(DNSDistTest
):
391 # this test suite uses a different responder port
392 # because it uses a different health check configuration
393 _testServerPort
= pickAvailablePort()
394 _answerUnexpected
= True
396 _serverKey
= 'server.key'
397 _serverCert
= 'server.chain'
398 _serverName
= 'tls.tests.dnsdist.org'
400 _tlsServerPort
= pickAvailablePort()
402 _skipListeningOnCL
= True
404 _maxTCPConnsPerTLSFrontend
= 5
405 _config_template
= """
406 newServer{address="127.0.0.1:%s"}
407 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", maxConcurrentTCPConnections=%d })
409 _config_params
= ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerTLSFrontend']
410 _alternateListeningAddr
= '127.0.0.1'
411 _alternateListeningPort
= _tlsServerPort
413 def testTCPConnsPerTLSFrontend(self
):
415 TLS Frontend Limits: Maximum number of conns per TLS frontend
417 name
= 'maxconnspertlsfrontend.tls.tests.powerdns.com.'
418 query
= dns
.message
.make_query(name
, 'A', 'IN')
421 for idx
in range(self
._maxTCPConnsPerTLSFrontend
+ 1):
423 conns
.append(self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
))
435 self
.sendTCPQueryOverConnection(conn
, query
)
436 response
= self
.recvTCPResponseOverConnection(conn
)
448 # wait a bit to be sure that dnsdist closed the connections
449 # and decremented the counters on its side, otherwise subsequent
450 # connections will be dropped
453 self
.assertEqual(count
, self
._maxTCPConnsPerTLSFrontend
)
454 self
.assertEqual(failed
, 1)
456 class TestProtocols(DNSDistTest
):
457 _serverKey
= 'server.key'
458 _serverCert
= 'server.chain'
459 _serverName
= 'tls.tests.dnsdist.org'
461 _tlsServerPort
= pickAvailablePort()
463 _config_template
= """
464 function checkDOT(dq)
465 if dq:getProtocol() ~= "DNS over TLS" then
466 return DNSAction.Spoof, '1.2.3.4'
468 return DNSAction.None
471 addAction("protocols.tls.tests.powerdns.com.", LuaAction(checkDOT))
472 newServer{address="127.0.0.1:%s"}
473 addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
475 _config_params
= ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
477 def testProtocolDOT(self
):
479 DoT: Test DNSQuestion.Protocol
481 name
= 'protocols.tls.tests.powerdns.com.'
482 query
= dns
.message
.make_query(name
, 'A', 'IN')
483 response
= dns
.message
.make_response(query
)
485 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
486 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
487 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
488 self
.assertTrue(receivedQuery
)
489 self
.assertTrue(receivedResponse
)
490 receivedQuery
.id = query
.id
491 self
.assertEqual(query
, receivedQuery
)
492 self
.assertEqual(response
, receivedResponse
)
495 class TestPKCSTLSCertificate(DNSDistTest
, TLSTests
):
496 _consoleKey
= DNSDistTest
.generateConsoleKey()
497 _consoleKeyB64
= base64
.b64encode(_consoleKey
).decode('ascii')
498 _serverCert
= 'server-tls.p12'
499 _pkcsPassphrase
= 'passw0rd'
500 _serverName
= 'tls.tests.dnsdist.org'
502 _tlsServerPort
= pickAvailablePort()
503 _config_template
= """
505 controlSocket("127.0.0.1:%s")
506 newServer{address="127.0.0.1:%s"}
507 cert=newTLSCertificate("%s", {password="%s"})
508 addTLSLocal("127.0.0.1:%s", cert, "", { provider="openssl" })
509 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
511 _config_params
= ['_consoleKeyB64', '_consolePort', '_testServerPort', '_serverCert', '_pkcsPassphrase', '_tlsServerPort']
515 cls
.generateNewCertificateAndKey('server-tls')
516 cls
.startResponders()