# Test the support for SSL and sockets
+import contextlib
import sys
import unittest
import unittest.mock
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = socket_helper.HOST
+IS_AWS_LC = "AWS-LC" in ssl.OPENSSL_VERSION
IS_OPENSSL_3_0_0 = ssl.OPENSSL_VERSION_INFO >= (3, 0, 0)
PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
)
-def test_wrap_socket(sock, *,
- cert_reqs=ssl.CERT_NONE, ca_certs=None,
- ciphers=None, certfile=None, keyfile=None,
- **kwargs):
- if not kwargs.get("server_side"):
- kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- else:
+def make_test_context(
+ *,
+ server_side=False,
+ check_hostname=None,
+ cert_reqs=ssl.CERT_NONE,
+ ca_certs=None, certfile=None, keyfile=None,
+ ciphers=None,
+):
+ if server_side:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- if cert_reqs is not None:
+ else:
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ if check_hostname is None:
if cert_reqs == ssl.CERT_NONE:
context.check_hostname = False
+ else:
+ context.check_hostname = check_hostname
+ if cert_reqs is not None:
context.verify_mode = cert_reqs
if ca_certs is not None:
context.load_verify_locations(ca_certs)
context.load_cert_chain(certfile, keyfile)
if ciphers is not None:
context.set_ciphers(ciphers)
- return context.wrap_socket(sock, **kwargs)
+ return context
+
+
+def test_wrap_socket(
+ sock,
+ *,
+ server_side=False,
+ check_hostname=None,
+ cert_reqs=ssl.CERT_NONE,
+ ca_certs=None, certfile=None, keyfile=None,
+ ciphers=None,
+ **kwargs,
+):
+ context = make_test_context(
+ server_side=server_side,
+ check_hostname=check_hostname,
+ cert_reqs=cert_reqs,
+ ca_certs=ca_certs, certfile=certfile, keyfile=keyfile,
+ ciphers=ciphers,
+ )
+ if not server_side:
+ kwargs.setdefault("server_hostname", SIGNED_CERTFILE_HOSTNAME)
+ return context.wrap_socket(sock, server_side=server_side, **kwargs)
USE_SAME_TEST_CONTEXT = False
return client_context, server_context, hostname
+def do_ssl_object_handshake(sslobject, outgoing, max_retry=25):
+ """Call do_handshake() on the sslobject and return the sent data.
+
+ If do_handshake() fails more than *max_retry* times, return None.
+ """
+ data, attempt = None, 0
+ while not data and attempt < max_retry:
+ with contextlib.suppress(ssl.SSLWantReadError):
+ sslobject.do_handshake()
+ data = outgoing.read()
+ attempt += 1
+ return data
+
+
class BasicSocketTests(unittest.TestCase):
def test_constants(self):
ctx.set_servername_callback(None)
ctx.set_servername_callback(dummycallback)
+ def test_sni_callback_on_dead_references(self):
+ # See https://github.com/python/cpython/issues/146080.
+ c_ctx = make_test_context()
+ c_inc, c_out = ssl.MemoryBIO(), ssl.MemoryBIO()
+ client = c_ctx.wrap_bio(c_inc, c_out, server_hostname=SIGNED_CERTFILE_HOSTNAME)
+
+ def sni_callback(sock, servername, ctx): pass
+ sni_callback = unittest.mock.Mock(wraps=sni_callback)
+ s_ctx = make_test_context(server_side=True, certfile=SIGNED_CERTFILE)
+ s_ctx.set_servername_callback(sni_callback)
+
+ s_inc, s_out = ssl.MemoryBIO(), ssl.MemoryBIO()
+ server = s_ctx.wrap_bio(s_inc, s_out, server_side=True)
+ server_impl = server._sslobj
+
+ # Perform the handshake on the client side first.
+ data = do_ssl_object_handshake(client, c_out)
+ sni_callback.assert_not_called()
+ if data is None:
+ self.skipTest("cannot establish a handshake from the client")
+ s_inc.write(data)
+ sni_callback.assert_not_called()
+ # Delete the server object before it starts doing its handshake
+ # and ensure that we did not call the SNI callback yet.
+ del server
+ gc.collect()
+ # Try to continue the server's handshake by directly using
+ # the internal SSL object. The latter is a weak reference
+ # stored in the server context and has now a dead owner.
+ with self.assertRaises(ssl.SSLError) as cm:
+ server_impl.do_handshake()
+ # The SNI C callback raised an exception before calling our callback.
+ sni_callback.assert_not_called()
+
+ # In AWS-LC, any handshake failures reports SSL_R_PARSE_TLSEXT,
+ # while OpenSSL uses SSL_R_CALLBACK_FAILED on SNI callback failures.
+ if IS_AWS_LC:
+ libssl_error_reason = "PARSE_TLSEXT"
+ else:
+ libssl_error_reason = "callback failed"
+ self.assertIn(libssl_error_reason, str(cm.exception))
+ self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_SSL)
+
def test_sni_callback_refcycle(self):
# Reference cycles through the servername callback are detected
# and cleared.