From: Remi Gacogne Date: Thu, 7 Jan 2021 16:41:00 +0000 (+0100) Subject: dnsdist: Add a unit test for SNI routing after session resumption X-Git-Tag: rec-4.5.0-alpha1~31^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F9921%2Fhead;p=thirdparty%2Fpdns.git dnsdist: Add a unit test for SNI routing after session resumption --- diff --git a/regression-tests.dnsdist/test_TLS.py b/regression-tests.dnsdist/test_TLS.py index 48de31e67f..f2c8a9ba83 100644 --- a/regression-tests.dnsdist/test_TLS.py +++ b/regression-tests.dnsdist/test_TLS.py @@ -1,20 +1,11 @@ #!/usr/bin/env python import dns +import socket +import ssl +import unittest from dnsdisttests import DNSDistTest -class TestTLS(DNSDistTest): - - _serverKey = 'server.key' - _serverCert = 'server.chain' - _serverName = 'tls.tests.dnsdist.org' - _caCert = 'ca.pem' - _tlsServerPort = 8453 - _config_template = """ - newServer{address="127.0.0.1:%s"} - addTLSLocal("127.0.0.1:%s", "%s", "%s") - addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4")) - """ - _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] +class TLSTests(object): def testTLSSimple(self): """ @@ -133,6 +124,107 @@ class TestTLS(DNSDistTest): self.assertEquals(query, receivedQuery) self.assertEquals(response, receivedResponse) + def testTLSSNIRoutingAfterResumption(self): + # we have more complicated tests about session resumption itself, + # but here we want to make sure the SNI is still present after resumption + """ + TLS: SNI Routing after resumption + """ + name = 'sni-resumed.tls.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + query.flags &= ~dns.flags.RD + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + expectedResponse = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '1.2.3.4') + expectedResponse.answer.append(rrset) + + # this SNI should match so we should get a spoofed answer + sslctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1_2) + sslctx.check_hostname = True + sslctx.verify_mode = ssl.CERT_REQUIRED + sslctx.load_verify_locations(self._caCert) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(2.0) + sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com') + sslsock.connect(("127.0.0.1", self._tlsServerPort)) + + self.sendTCPQueryOverConnection(sslsock, query, response=None) + receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) + self.assertFalse(sslsock.session_reused) + session = sslsock.session + + # this one should not (different SNI) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(2.0) + sslsock = sslctx.wrap_socket(sock, server_hostname=self._serverName) + sslsock.connect(("127.0.0.1", self._tlsServerPort)) + + self.sendTCPQueryOverConnection(sslsock, query, response=response) + (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(sslsock, useQueue=True) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(response, receivedResponse) + self.assertFalse(sslsock.session_reused) + + # and now we should be able to resume the session + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.settimeout(2.0) + sslsock = sslctx.wrap_socket(sock, server_hostname='powerdns.com') + sslsock.session = session + sslsock.connect(("127.0.0.1", self._tlsServerPort)) + + self.sendTCPQueryOverConnection(sslsock, query, response=None) + receivedResponse = self.recvTCPResponseOverConnection(sslsock, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) + self.assertTrue(sslsock.session_reused) + +class TestOpenSSL(DNSDistTest, TLSTests): + + _serverKey = 'server.key' + _serverCert = 'server.chain' + _serverName = 'tls.tests.dnsdist.org' + _caCert = 'ca.pem' + _tlsServerPort = 8453 + _config_template = """ + newServer{address="127.0.0.1:%s"} + addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl" }) + addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4")) + """ + _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] + +class TestGnuTLS(DNSDistTest, TLSTests): + + _serverKey = 'server.key' + _serverCert = 'server.chain' + _serverName = 'tls.tests.dnsdist.org' + _caCert = 'ca.pem' + _tlsServerPort = 8453 + _config_template = """ + newServer{address="127.0.0.1:%s"} + addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls" }) + addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4")) + """ + _config_params = ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey'] + class TestDOTWithCache(DNSDistTest): _serverKey = 'server.key'