self._ssl_options,
server_hostname=self._server_hostname,
do_handshake_on_connect=False,
+ server_side=False,
)
self._add_io_state(old_state)
def ssl_options_to_context(
- ssl_options: Union[Dict[str, Any], ssl.SSLContext]
+ ssl_options: Union[Dict[str, Any], ssl.SSLContext],
+ server_side: Optional[bool] = None,
) -> ssl.SSLContext:
"""Try to convert an ``ssl_options`` dictionary to an
`~ssl.SSLContext` object.
`~ssl.SSLContext` equivalent, and may be used when a component which
accepts both forms needs to upgrade to the `~ssl.SSLContext` version
to use features like SNI or NPN.
+
+ .. versionchanged:: 6.2
+
+ Added server_side argument. Omitting this argument will
+ result in a DeprecationWarning on Python 3.10.
+
"""
if isinstance(ssl_options, ssl.SSLContext):
return ssl_options
assert isinstance(ssl_options, dict)
assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
- # Can't use create_default_context since this interface doesn't
- # tell us client vs server.
- context = ssl.SSLContext(ssl_options.get("ssl_version", ssl.PROTOCOL_SSLv23))
+ # TODO: Now that we have the server_side argument, can we switch to
+ # create_default_context or would that change behavior?
+ default_version = ssl.PROTOCOL_TLS
+ if server_side:
+ default_version = ssl.PROTOCOL_TLS_SERVER
+ elif server_side is not None:
+ default_version = ssl.PROTOCOL_TLS_CLIENT
+ context = ssl.SSLContext(ssl_options.get("ssl_version", default_version))
if "certfile" in ssl_options:
context.load_cert_chain(
ssl_options["certfile"], ssl_options.get("keyfile", None)
)
if "cert_reqs" in ssl_options:
+ if ssl_options["cert_reqs"] == ssl.CERT_NONE:
+ # This may have been set automatically by PROTOCOL_TLS_CLIENT but is
+ # incompatible with CERT_NONE so we must manually clear it.
+ context.check_hostname = False
context.verify_mode = ssl_options["cert_reqs"]
if "ca_certs" in ssl_options:
context.load_verify_locations(ssl_options["ca_certs"])
socket: socket.socket,
ssl_options: Union[Dict[str, Any], ssl.SSLContext],
server_hostname: Optional[str] = None,
+ server_side: Optional[bool] = None,
**kwargs: Any
) -> ssl.SSLSocket:
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
keyword arguments are passed to ``wrap_socket`` (either the
`~ssl.SSLContext` method or the `ssl` module function as
appropriate).
+
+ .. versionchanged:: 6.2
+
+ Added server_side argument. Omitting this argument will
+ result in a DeprecationWarning on Python 3.10.
"""
- context = ssl_options_to_context(ssl_options)
+ context = ssl_options_to_context(ssl_options, server_side=server_side)
+ if server_side is None:
+ server_side = False
if ssl.HAS_SNI:
# In python 3.4, wrap_socket only accepts the server_hostname
# argument if HAS_SNI is true.
# TODO: add a unittest (python added server-side SNI support in 3.4)
# In the meantime it can be manually tested with
# python3 -m tornado.httpclient https://sni.velox.ch
- return context.wrap_socket(socket, server_hostname=server_hostname, **kwargs)
+ return context.wrap_socket(
+ socket, server_hostname=server_hostname, server_side=server_side, **kwargs
+ )
else:
- return context.wrap_socket(socket, **kwargs)
+ return context.wrap_socket(socket, server_side=server_side, **kwargs)
class SSLContextTest(BaseSSLTest, SSLTestMixin):
def get_ssl_options(self):
- context = ssl_options_to_context(AsyncHTTPSTestCase.get_ssl_options(self))
+ context = ssl_options_to_context(
+ AsyncHTTPSTestCase.get_ssl_options(self), server_side=True
+ )
assert isinstance(context, ssl.SSLContext)
return context
from tornado.httputil import HTTPHeaders
from tornado.locks import Condition, Event
from tornado.log import gen_log
-from tornado.netutil import ssl_wrap_socket
+from tornado.netutil import ssl_options_to_context, ssl_wrap_socket
from tornado.platform.asyncio import AddThreadSelectorEventLoop
from tornado.tcpserver import TCPServer
from tornado.testing import (
ExpectLog,
gen_test,
)
-from tornado.test.util import skipIfNonUnix, refusing_port, skipPypy3V58
+from tornado.test.util import (
+ skipIfNonUnix,
+ refusing_port,
+ skipPypy3V58,
+ ignore_deprecation,
+)
from tornado.web import RequestHandler, Application
import asyncio
import errno
class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
def _make_server_iostream(self, connection, **kwargs):
- connection = ssl.wrap_socket(
+ ssl_ctx = ssl_options_to_context(_server_ssl_options(), server_side=True)
+ connection = ssl_ctx.wrap_socket(
connection,
server_side=True,
do_handshake_on_connect=False,
- **_server_ssl_options()
)
return SSLIOStream(connection, **kwargs)
# instead of an ssl_options dict to the SSLIOStream constructor.
class TestIOStreamSSLContext(TestIOStreamMixin, AsyncTestCase):
def _make_server_iostream(self, connection, **kwargs):
- context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(
os.path.join(os.path.dirname(__file__), "test.crt"),
os.path.join(os.path.dirname(__file__), "test.key"),
return SSLIOStream(connection, **kwargs)
def _make_client_iostream(self, connection, **kwargs):
- context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
return SSLIOStream(connection, ssl_options=context, **kwargs)
# to openssl 1.1.c. Other platforms might be affected with
# newer openssl too). Disable it until we figure out
# what's up.
- ssl_ctx.options |= getattr(ssl, "OP_NO_TLSv1_3", 0)
- client = SSLIOStream(socket.socket(), ssl_options=ssl_ctx)
+ # Update 2021-12-28: Still happening with Python 3.10 on
+ # Windows. OP_NO_TLSv1_3 now raises a DeprecationWarning.
+ with ignore_deprecation():
+ ssl_ctx.options |= getattr(ssl, "OP_NO_TLSv1_3", 0)
+ client = SSLIOStream(socket.socket(), ssl_options=ssl_ctx)
yield client.connect(("127.0.0.1", port))
self.assertIsNotNone(client.socket.cipher())
finally:
)
def test_ssl_options(self):
- resp = self.fetch("/hello", ssl_options={})
+ resp = self.fetch("/hello", ssl_options={"cert_reqs": ssl.CERT_NONE})
self.assertEqual(resp.body, b"Hello world!")
def test_ssl_context(self):
- resp = self.fetch("/hello", ssl_options=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
+ ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+ resp = self.fetch("/hello", ssl_options=ssl_ctx)
self.assertEqual(resp.body, b"Hello world!")
def test_ssl_options_handshake_fail(self):
def test_ssl_context_handshake_fail(self):
with ExpectLog(gen_log, "SSL Error|Uncaught exception"):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
+ # CERT_REQUIRED is set by default.
+ ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
with self.assertRaises(ssl.SSLError):
self.fetch("/hello", ssl_options=ctx, raise_error=True)