]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add a unit test for SNI routing after session resumption 9921/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 7 Jan 2021 16:41:00 +0000 (17:41 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 7 Jan 2021 16:41:00 +0000 (17:41 +0100)
regression-tests.dnsdist/test_TLS.py

index 48de31e67f72c08bb24c484cfcd4fd55215977a4..f2c8a9ba83a3de172488c735ea609296a03b5288 100644 (file)
@@ -1,20 +1,11 @@
 #!/usr/bin/env python
 import dns
+import socket
+import ssl
+import unittest
 from dnsdisttests import DNSDistTest
 
-class TestTLS(DNSDistTest):
-
-    _serverKey = 'server.key'
-    _serverCert = 'server.chain'
-    _serverName = 'tls.tests.dnsdist.org'
-    _caCert = 'ca.pem'
-    _tlsServerPort = 8453
-    _config_template = """
-    newServer{address="127.0.0.1:%s"}
-    addTLSLocal("127.0.0.1:%s", "%s", "%s")
-    addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
-    """
-    _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+class TLSTests(object):
 
     def testTLSSimple(self):
         """
@@ -133,6 +124,107 @@ class TestTLS(DNSDistTest):
         self.assertEquals(query, receivedQuery)
         self.assertEquals(response, receivedResponse)
 
+    def testTLSSNIRoutingAfterResumption(self):
+        # we have more complicated tests about session resumption itself,
+        # but here we want to make sure the SNI is still present after resumption
+        """
+        TLS: SNI Routing after resumption
+        """
+        name = 'sni-resumed.tls.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.flags &= ~dns.flags.RD
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '1.2.3.4')
+        expectedResponse.answer.append(rrset)
+
+        # this SNI should match so we should get a spoofed answer
+        sslctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2)
+        sslctx.check_hostname = True
+        sslctx.verify_mode = ssl.CERT_REQUIRED
+        sslctx.load_verify_locations(self._caCert)
+
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        sock.settimeout(2.0)
+        sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com')
+        sslsock.connect(("127.0.0.1", self._tlsServerPort))
+
+        self.sendTCPQueryOverConnection(sslsock, query, response=None)
+        receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False)
+        self.assertTrue(receivedResponse)
+        self.assertEquals(expectedResponse, receivedResponse)
+        self.assertFalse(sslsock.session_reused)
+        session = sslsock.session
+
+        # this one should not (different SNI)
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        sock.settimeout(2.0)
+        sslsock = sslctx.wrap_socket(sock, server_hostname=self._serverName)
+        sslsock.connect(("127.0.0.1", self._tlsServerPort))
+
+        self.sendTCPQueryOverConnection(sslsock, query, response=response)
+        (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(sslsock, useQueue=True)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEquals(query, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+        self.assertFalse(sslsock.session_reused)
+
+        # and now we should be able to resume the session
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        sock.settimeout(2.0)
+        sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com')
+        sslsock.session = session
+        sslsock.connect(("127.0.0.1", self._tlsServerPort))
+
+        self.sendTCPQueryOverConnection(sslsock, query, response=None)
+        receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False)
+        self.assertTrue(receivedResponse)
+        self.assertEquals(expectedResponse, receivedResponse)
+        self.assertTrue(sslsock.session_reused)
+
+class TestOpenSSL(DNSDistTest, TLSTests):
+
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _tlsServerPort = 8453
+    _config_template = """
+    newServer{address="127.0.0.1:%s"}
+    addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" })
+    addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
+    """
+    _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+
+class TestGnuTLS(DNSDistTest, TLSTests):
+
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _tlsServerPort = 8453
+    _config_template = """
+    newServer{address="127.0.0.1:%s"}
+    addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" })
+    addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
+    """
+    _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
+
 class TestDOTWithCache(DNSDistTest):
 
     _serverKey = 'server.key'