import numbers
import os
import socket
+import ssl
import sys
import re
-import warnings
from tornado.concurrent import Future
from tornado import ioloop
except ImportError:
_set_nonblocking = None
-try:
- import ssl
-except ImportError:
- # ssl is not available on Google App Engine
- ssl = None
-
# These errnos indicate that a non-blocking operation must be retried
# at a later time. On most platforms they're the same value, but on
# some they differ.
class BaseIOStream(object):
"""A utility class to write to and read from a non-blocking file or socket.
- We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
- All of the methods take an optional ``callback`` argument and return a
- `.Future` only if no callback is given. When the operation completes,
- the callback will be run or the `.Future` will resolve with the data
- read (or ``None`` for ``write()``). All outstanding ``Futures`` will
- resolve with a `StreamClosedError` when the stream is closed; users
- of the callback interface will be notified via
- `.BaseIOStream.set_close_callback` instead.
+ We support a non-blocking ``write()`` and a family of ``read_*()``
+ methods. When the operation completes, the `.Future` will resolve
+ with the data read (or ``None`` for ``write()``). All outstanding
+ ``Futures`` will resolve with a `StreamClosedError` when the
+ stream is closed; `.BaseIOStream.set_close_callback` can also be used
+ to be notified of a closed stream.
When a stream is closed due to an error, the IOStream's ``error``
attribute contains the exception object.
Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
`read_from_fd`, and optionally `get_fd_error`.
+
"""
def __init__(self, max_buffer_size=None,
read_chunk_size=None, max_write_buffer_size=None):
self._read_bytes = None
self._read_partial = False
self._read_until_close = False
- self._read_callback = None
self._read_future = None
- self._streaming_callback = None
- self._write_callback = None
self._write_futures = collections.deque()
self._close_callback = None
- self._connect_callback = None
self._connect_future = None
# _ssl_connect_future should be defined in SSLIOStream
- # but it's here so we can clean it up in maybe_run_close_callback.
+ # but it's here so we can clean it up in _signal_closed
# TODO: refactor that so subclasses can add additional futures
# to be cancelled.
self._ssl_connect_future = None
self._connecting = False
self._state = None
- self._pending_callbacks = 0
self._closed = False
def fileno(self):
"""
return None
- def read_until_regex(self, regex, callback=None, max_bytes=None):
+ def read_until_regex(self, regex, max_bytes=None):
"""Asynchronously read until we have matched the given regex.
The result includes the data that matches the regex and anything
- that came before it. If a callback is given, it will be run
- with the data as an argument; if not, this method returns a
- `.Future`.
+ that came before it.
If ``max_bytes`` is not None, the connection will be closed
if more than ``max_bytes`` bytes have been read and the regex is
Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted.
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
+ The ``callback`` argument was removed. Use the returned
+ `.Future` instead.
"""
- future = self._set_read_callback(callback)
+ future = self._start_read()
self._read_regex = re.compile(regex)
self._read_max_bytes = max_bytes
try:
self.close(exc_info=e)
return future
except:
- if future is not None:
- # Ensure that the future doesn't log an error because its
- # failure was never examined.
- future.add_done_callback(lambda f: f.exception())
+ # Ensure that the future doesn't log an error because its
+ # failure was never examined.
+ future.add_done_callback(lambda f: f.exception())
raise
return future
- def read_until(self, delimiter, callback=None, max_bytes=None):
+ def read_until(self, delimiter, max_bytes=None):
"""Asynchronously read until we have found the given delimiter.
The result includes all the data read including the delimiter.
- If a callback is given, it will be run with the data as an argument;
- if not, this method returns a `.Future`.
If ``max_bytes`` is not None, the connection will be closed
if more than ``max_bytes`` bytes have been read and the delimiter
Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted.
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
+ The ``callback`` argument was removed. Use the returned
+ `.Future` instead.
"""
- future = self._set_read_callback(callback)
+ future = self._start_read()
self._read_delimiter = delimiter
self._read_max_bytes = max_bytes
try:
self.close(exc_info=e)
return future
except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
+ future.add_done_callback(lambda f: f.exception())
raise
return future
- def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
- partial=False):
+ def read_bytes(self, num_bytes, partial=False):
"""Asynchronously read a number of bytes.
- If a ``streaming_callback`` is given, it will be called with chunks
- of data as they become available, and the final result will be empty.
- Otherwise, the result is all the data that was read.
- If a callback is given, it will be run with the data as an argument;
- if not, this method returns a `.Future`.
-
- If ``partial`` is true, the callback is run as soon as we have
+ If ``partial`` is true, data is returned as soon as we have
any bytes to return (but never more than ``num_bytes``)
.. versionchanged:: 4.0
Added the ``partial`` argument. The callback argument is now
optional and a `.Future` will be returned if it is omitted.
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` and ``streaming_callback`` arguments are
- deprecated and will be removed in Tornado 6.0. Use the
- returned `.Future` (and ``partial=True`` for
- ``streaming_callback``) instead.
+ The ``callback`` and ``streaming_callback`` arguments have
+ been removed. Use the returned `.Future` (and
+ ``partial=True`` for ``streaming_callback``) instead.
"""
- future = self._set_read_callback(callback)
+ future = self._start_read()
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._read_partial = partial
- if streaming_callback is not None:
- warnings.warn("streaming_callback is deprecated, use partial instead",
- DeprecationWarning)
- self._streaming_callback = stack_context.wrap(streaming_callback)
try:
self._try_inline_read()
except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
+ future.add_done_callback(lambda f: f.exception())
raise
return future
- def read_into(self, buf, callback=None, partial=False):
+ def read_into(self, buf, partial=False):
"""Asynchronously read a number of bytes.
``buf`` must be a writable buffer into which data will be read.
- If a callback is given, it will be run with the number of read
- bytes as an argument; if not, this method returns a `.Future`.
If ``partial`` is true, the callback is run as soon as any bytes
have been read. Otherwise, it is run when the ``buf`` has been
.. versionadded:: 5.0
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
+ The ``callback`` argument was removed. Use the returned
+ `.Future` instead.
"""
- future = self._set_read_callback(callback)
+ future = self._start_read()
# First copy data already in read buffer
available_bytes = self._read_buffer_size
try:
self._try_inline_read()
except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
+ future.add_done_callback(lambda f: f.exception())
raise
return future
- def read_until_close(self, callback=None, streaming_callback=None):
+ def read_until_close(self):
"""Asynchronously reads all data from the socket until it is closed.
- If a ``streaming_callback`` is given, it will be called with chunks
- of data as they become available, and the final result will be empty.
- Otherwise, the result is all the data that was read.
- If a callback is given, it will be run with the data as an argument;
- if not, this method returns a `.Future`.
-
- Note that if a ``streaming_callback`` is used, data will be
- read from the socket as quickly as it becomes available; there
- is no way to apply backpressure or cancel the reads. If flow
- control or cancellation are desired, use a loop with
- `read_bytes(partial=True) <.read_bytes>` instead.
+ This will buffer all available data until ``max_buffer_size``
+ is reached. If flow control or cancellation are desired, use a
+ loop with `read_bytes(partial=True) <.read_bytes>` instead.
.. versionchanged:: 4.0
The callback argument is now optional and a `.Future` will
be returned if it is omitted.
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` and ``streaming_callback`` arguments are
- deprecated and will be removed in Tornado 6.0. Use the
- returned `.Future` (and `read_bytes` with ``partial=True``
- for ``streaming_callback``) instead.
+ The ``callback`` and ``streaming_callback`` arguments have
+ been removed. Use the returned `.Future` (and `read_bytes`
+ with ``partial=True`` for ``streaming_callback``) instead.
"""
- future = self._set_read_callback(callback)
- if streaming_callback is not None:
- warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead",
- DeprecationWarning)
- self._streaming_callback = stack_context.wrap(streaming_callback)
+ future = self._start_read()
if self.closed():
- if self._streaming_callback is not None:
- self._run_read_callback(self._read_buffer_size, True)
- self._run_read_callback(self._read_buffer_size, False)
+ self._finish_read(self._read_buffer_size, False)
return future
self._read_until_close = True
try:
self._try_inline_read()
except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
+ future.add_done_callback(lambda f: f.exception())
raise
return future
- def write(self, data, callback=None):
+ def write(self, data):
"""Asynchronously write the given data to this stream.
- If ``callback`` is given, we call it when all of the buffered write
- data has been successfully written to the stream. If there was
- previously buffered write data and an old write callback, that
- callback is simply overwritten with this new callback.
-
- If no ``callback`` is given, this method returns a `.Future` that
- resolves (with a result of ``None``) when the write has been
- completed.
+ This method returns a `.Future` that resolves (with a result
+ of ``None``) when the write has been completed.
The ``data`` argument may be of type `bytes` or `memoryview`.
.. versionchanged:: 4.5
Added support for `memoryview` arguments.
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
+ The ``callback`` argument was removed. Use the returned
+ `.Future` instead.
"""
self._check_closed()
raise StreamBufferFullError("Reached maximum write buffer size")
self._write_buffer.append(data)
self._total_write_index += len(data)
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._write_callback = stack_context.wrap(callback)
- future = None
- else:
- future = Future()
- future.add_done_callback(lambda f: f.exception())
- self._write_futures.append((self._total_write_index, future))
+ future = Future()
+ future.add_done_callback(lambda f: f.exception())
+ self._write_futures.append((self._total_write_index, future))
if not self._connecting:
self._handle_write()
if self._write_buffer:
closed while no other read or write is in progress.
Unlike other callback-based interfaces, ``set_close_callback``
- will not be removed in Tornado 6.0.
+ was not removed in Tornado 6.0.
"""
self._close_callback = stack_context.wrap(callback)
self._maybe_add_error_listener()
if any(exc_info):
self.error = exc_info[1]
if self._read_until_close:
- if (self._streaming_callback is not None and
- self._read_buffer_size):
- self._run_read_callback(self._read_buffer_size, True)
self._read_until_close = False
- self._run_read_callback(self._read_buffer_size, False)
+ self._finish_read(self._read_buffer_size, False)
if self._state is not None:
self.io_loop.remove_handler(self.fileno())
self._state = None
self.close_fd()
self._closed = True
- self._maybe_run_close_callback()
-
- def _maybe_run_close_callback(self):
- # If there are pending callbacks, don't run the close callback
- # until they're done (see _maybe_add_error_handler)
- if self.closed() and self._pending_callbacks == 0:
- futures = []
- if self._read_future is not None:
- futures.append(self._read_future)
- self._read_future = None
- futures += [future for _, future in self._write_futures]
- self._write_futures.clear()
- if self._connect_future is not None:
- futures.append(self._connect_future)
- self._connect_future = None
- if self._ssl_connect_future is not None:
- futures.append(self._ssl_connect_future)
- self._ssl_connect_future = None
- for future in futures:
- future.set_exception(StreamClosedError(real_error=self.error))
- future.exception()
- if self._close_callback is not None:
- cb = self._close_callback
- self._close_callback = None
- self._run_callback(cb)
- # Delete any unfinished callbacks to break up reference cycles.
- self._read_callback = self._write_callback = None
- # Clear the buffers so they can be cleared immediately even
- # if the IOStream object is kept alive by a reference cycle.
- # TODO: Clear the read buffer too; it currently breaks some tests.
- self._write_buffer = None
+ self._signal_closed()
+
+ def _signal_closed(self):
+ futures = []
+ if self._read_future is not None:
+ futures.append(self._read_future)
+ self._read_future = None
+ futures += [future for _, future in self._write_futures]
+ self._write_futures.clear()
+ if self._connect_future is not None:
+ futures.append(self._connect_future)
+ self._connect_future = None
+ for future in futures:
+ future.set_exception(StreamClosedError(real_error=self.error))
+ future.exception()
+ if self._ssl_connect_future is not None:
+ # _ssl_connect_future expects to see the real exception (typically
+ # an ssl.SSLError), not just StreamClosedError.
+ self._ssl_connect_future.set_exception(self.error)
+ self._ssl_connect_future.exception()
+ self._ssl_connect_future = None
+ if self._close_callback is not None:
+ cb = self._close_callback
+ self._close_callback = None
+ self.io_loop.add_callback(cb)
+ # Clear the buffers so they can be cleared immediately even
+ # if the IOStream object is kept alive by a reference cycle.
+ # TODO: Clear the read buffer too; it currently breaks some tests.
+ self._write_buffer = None
def reading(self):
"""Returns true if we are currently reading from the stream."""
- return self._read_callback is not None or self._read_future is not None
+ return self._read_future is not None
def writing(self):
"""Returns true if we are currently writing to the stream."""
self.close(exc_info=e)
raise
- def _run_callback(self, callback, *args):
- def wrapper():
- self._pending_callbacks -= 1
- try:
- return callback(*args)
- except Exception as e:
- app_log.error("Uncaught exception, closing connection.",
- exc_info=True)
- # Close the socket on an uncaught exception from a user callback
- # (It would eventually get closed when the socket object is
- # gc'd, but we don't want to rely on gc happening before we
- # run out of file descriptors)
- self.close(exc_info=e)
- # Re-raise the exception so that IOLoop.handle_callback_exception
- # can see it and log the error
- raise
- finally:
- self._maybe_add_error_listener()
- # We schedule callbacks to be run on the next IOLoop iteration
- # rather than running them directly for several reasons:
- # * Prevents unbounded stack growth when a callback calls an
- # IOLoop operation that immediately runs another callback
- # * Provides a predictable execution context for e.g.
- # non-reentrant mutexes
- # * Ensures that the try/except in wrapper() is run outside
- # of the application's StackContexts
- with stack_context.NullContext():
- # stack_context was already captured in callback, we don't need to
- # capture it again for IOStream's wrapper. This is especially
- # important if the callback was pre-wrapped before entry to
- # IOStream (as in HTTPConnection._header_callback), as we could
- # capture and leak the wrong context here.
- self._pending_callbacks += 1
- self.io_loop.add_callback(wrapper)
-
def _read_to_buffer_loop(self):
# This method is called from _handle_read and _try_inline_read.
- try:
- if self._read_bytes is not None:
- target_bytes = self._read_bytes
- elif self._read_max_bytes is not None:
- target_bytes = self._read_max_bytes
- elif self.reading():
- # For read_until without max_bytes, or
- # read_until_close, read as much as we can before
- # scanning for the delimiter.
- target_bytes = None
- else:
- target_bytes = 0
- next_find_pos = 0
- # Pretend to have a pending callback so that an EOF in
- # _read_to_buffer doesn't trigger an immediate close
- # callback. At the end of this method we'll either
- # establish a real pending callback via
- # _read_from_buffer or run the close callback.
- #
- # We need two try statements here so that
- # pending_callbacks is decremented before the `except`
- # clause below (which calls `close` and does need to
- # trigger the callback)
- self._pending_callbacks += 1
- while not self.closed():
- # Read from the socket until we get EWOULDBLOCK or equivalent.
- # SSL sockets do some internal buffering, and if the data is
- # sitting in the SSL object's buffer select() and friends
- # can't see it; the only way to find out if it's there is to
- # try to read it.
- if self._read_to_buffer() == 0:
- break
+ if self._read_bytes is not None:
+ target_bytes = self._read_bytes
+ elif self._read_max_bytes is not None:
+ target_bytes = self._read_max_bytes
+ elif self.reading():
+ # For read_until without max_bytes, or
+ # read_until_close, read as much as we can before
+ # scanning for the delimiter.
+ target_bytes = None
+ else:
+ target_bytes = 0
+ next_find_pos = 0
+ while not self.closed():
+ # Read from the socket until we get EWOULDBLOCK or equivalent.
+ # SSL sockets do some internal buffering, and if the data is
+ # sitting in the SSL object's buffer select() and friends
+ # can't see it; the only way to find out if it's there is to
+ # try to read it.
+ if self._read_to_buffer() == 0:
+ break
- self._run_streaming_callback()
+ # If we've read all the bytes we can use, break out of
+ # this loop.
- # If we've read all the bytes we can use, break out of
- # this loop. We can't just call read_from_buffer here
- # because of subtle interactions with the
- # pending_callback and error_listener mechanisms.
- #
- # If we've reached target_bytes, we know we're done.
- if (target_bytes is not None and
- self._read_buffer_size >= target_bytes):
- break
+ # If we've reached target_bytes, we know we're done.
+ if (target_bytes is not None and
+ self._read_buffer_size >= target_bytes):
+ break
- # Otherwise, we need to call the more expensive find_read_pos.
- # It's inefficient to do this on every read, so instead
- # do it on the first read and whenever the read buffer
- # size has doubled.
- if self._read_buffer_size >= next_find_pos:
- pos = self._find_read_pos()
- if pos is not None:
- return pos
- next_find_pos = self._read_buffer_size * 2
- return self._find_read_pos()
- finally:
- self._pending_callbacks -= 1
+ # Otherwise, we need to call the more expensive find_read_pos.
+ # It's inefficient to do this on every read, so instead
+ # do it on the first read and whenever the read buffer
+ # size has doubled.
+ if self._read_buffer_size >= next_find_pos:
+ pos = self._find_read_pos()
+ if pos is not None:
+ return pos
+ next_find_pos = self._read_buffer_size * 2
+ return self._find_read_pos()
def _handle_read(self):
try:
return
if pos is not None:
self._read_from_buffer(pos)
- return
- else:
- self._maybe_run_close_callback()
- def _set_read_callback(self, callback):
- assert self._read_callback is None, "Already reading"
+ def _start_read(self):
assert self._read_future is None, "Already reading"
- if callback is not None:
- warnings.warn("callbacks are deprecated, use returned Future instead",
- DeprecationWarning)
- self._read_callback = stack_context.wrap(callback)
- else:
- self._read_future = Future()
+ self._read_future = Future()
return self._read_future
- def _run_read_callback(self, size, streaming):
+ def _finish_read(self, size, streaming):
if self._user_read_buffer:
self._read_buffer = self._after_user_read_buffer or bytearray()
self._after_user_read_buffer = None
result = size
else:
result = self._consume(size)
- if streaming:
- callback = self._streaming_callback
- else:
- callback = self._read_callback
- self._read_callback = self._streaming_callback = None
- if self._read_future is not None:
- assert callback is None
- future = self._read_future
- self._read_future = None
-
- future.set_result(result)
- if callback is not None:
- assert (self._read_future is None) or streaming
- self._run_callback(callback, result)
- else:
- # If we scheduled a callback, we will add the error listener
- # afterwards. If we didn't, we have to do it now.
- self._maybe_add_error_listener()
+ if self._read_future is not None:
+ future = self._read_future
+ self._read_future = None
+ future.set_result(result)
+ self._maybe_add_error_listener()
def _try_inline_read(self):
"""Attempt to complete the current read operation from buffered data.
listening for reads on the socket.
"""
# See if we've already got the data from a previous read
- self._run_streaming_callback()
pos = self._find_read_pos()
if pos is not None:
self._read_from_buffer(pos)
return
self._check_closed()
- try:
- pos = self._read_to_buffer_loop()
- except Exception:
- # If there was an in _read_to_buffer, we called close() already,
- # but couldn't run the close callback because of _pending_callbacks.
- # Before we escape from this function, run the close callback if
- # applicable.
- self._maybe_run_close_callback()
- raise
+ pos = self._read_to_buffer_loop()
if pos is not None:
self._read_from_buffer(pos)
return
- # We couldn't satisfy the read inline, so either close the stream
- # or listen for new data.
- if self.closed():
- self._maybe_run_close_callback()
- else:
+ # We couldn't satisfy the read inline, so make sure we're
+ # listening for new data unless the stream is closed.
+ if not self.closed():
self._add_io_state(ioloop.IOLoop.READ)
def _read_to_buffer(self):
raise StreamBufferFullError("Reached maximum read buffer size")
return bytes_read
- def _run_streaming_callback(self):
- if self._streaming_callback is not None and self._read_buffer_size:
- bytes_to_consume = self._read_buffer_size
- if self._read_bytes is not None:
- bytes_to_consume = min(self._read_bytes, bytes_to_consume)
- self._read_bytes -= bytes_to_consume
- self._run_read_callback(bytes_to_consume, True)
-
def _read_from_buffer(self, pos):
"""Attempts to complete the currently-pending read from the buffer.
"""
self._read_bytes = self._read_delimiter = self._read_regex = None
self._read_partial = False
- self._run_read_callback(pos, False)
+ self._finish_read(pos, False)
def _find_read_pos(self):
"""Attempts to find a position in the read buffer that satisfies
self._write_futures.popleft()
future.set_result(None)
- if not len(self._write_buffer):
- if self._write_callback:
- callback = self._write_callback
- self._write_callback = None
- self._run_callback(callback)
-
def _consume(self, loc):
# Consume loc bytes from the read buffer and return them
if loc == 0:
# connection is first established because we are going to read or write
# immediately anyway. Instead, we insert checks at various times to
# see if the connection is idle and add the read listener then.
- if self._pending_callbacks != 0:
- return
if self._state is None or self._state == ioloop.IOLoop.ERROR:
- if self.closed():
- self._maybe_run_close_callback()
- elif (self._read_buffer_size == 0 and
- self._close_callback is not None):
+ if (not self.closed() and
+ self._read_buffer_size == 0 and
+ self._close_callback is not None):
self._add_io_state(ioloop.IOLoop.READ)
def _add_io_state(self, state):
Implementation notes: Reads and writes have a fast path and a
slow path. The fast path reads synchronously from socket
buffers, while the slow path uses `_add_io_state` to schedule
- an IOLoop callback. Note that in both cases, the callback is
- run asynchronously with `_run_callback`.
+ an IOLoop callback.
To detect closed connections, we must have called
`_add_io_state` at some point, but we want to delay this as
much as possible so we don't have to set an `IOLoop.ERROR`
listener that will be overwritten by the next slow-path
- operation. As long as there are callbacks scheduled for
- fast-path ops, those callbacks may do more reads.
- If a sequence of fast-path ops do not end in a slow-path op,
- (e.g. for an @asynchronous long-poll request), we must add
- the error handler. This is done in `_run_callback` and `write`
- (since the write callback is optional so we can have a
- fast-path write with no `_run_callback`)
+ operation. If a sequence of fast-path ops do not end in a
+ slow-path op, (e.g. for an @asynchronous long-poll request),
+ we must add the error handler.
+
+ TODO: reevaluate this now that callbacks are gone.
+
"""
if self.closed():
# connection has been closed, so there can be no future events
# See https://github.com/tornadoweb/tornado/pull/2008
del data
- def connect(self, address, callback=None, server_hostname=None):
+ def connect(self, address, server_hostname=None):
"""Connects the socket to a remote address without blocking.
May only be called if the socket passed to the constructor was
suitably-configured `ssl.SSLContext` to the
`SSLIOStream` constructor to disable.
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
+ The ``callback`` argument was removed. Use the returned
+ `.Future` instead.
"""
self._connecting = True
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._connect_callback = stack_context.wrap(callback)
- future = None
- else:
- future = self._connect_future = Future()
+ future = self._connect_future = Future()
try:
self.socket.connect(address)
except socket.error as e:
``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
suitably-configured `ssl.SSLContext` to disable.
"""
- if (self._read_callback or self._read_future or
- self._write_callback or self._write_futures or
- self._connect_callback or self._connect_future or
- self._pending_callbacks or self._closed or
+ if (self._read_future or
+ self._write_futures or
+ self._connect_future or
+ self._closed or
self._read_buffer or self._write_buffer):
raise ValueError("IOStream is not idle; cannot convert to SSL")
if ssl_options is None:
future = Future()
ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
- # Wrap the original close callback so we can fail our Future as well.
- # If we had an "unwrap" counterpart to this method we would need
- # to restore the original callback after our Future resolves
- # so that repeated wrap/unwrap calls don't build up layers.
-
- def close_callback():
- if not future.done():
- # Note that unlike most Futures returned by IOStream,
- # this one passes the underlying error through directly
- # instead of wrapping everything in a StreamClosedError
- # with a real_error attribute. This is because once the
- # connection is established it's more helpful to raise
- # the SSLError directly than to hide it behind a
- # StreamClosedError (and the client is expecting SSL
- # issues rather than network issues since this method is
- # named start_tls).
- future.set_exception(ssl_stream.error or StreamClosedError())
- if orig_close_callback is not None:
- orig_close_callback()
- ssl_stream.set_close_callback(close_callback)
- ssl_stream._ssl_connect_callback = lambda: future.set_result(ssl_stream)
+ ssl_stream.set_close_callback(orig_close_callback)
+ ssl_stream._ssl_connect_future = future
ssl_stream.max_buffer_size = self.max_buffer_size
ssl_stream.read_chunk_size = self.read_chunk_size
return future
self.socket.fileno(), errno.errorcode[err])
self.close()
return
- if self._connect_callback is not None:
- callback = self._connect_callback
- self._connect_callback = None
- self._run_callback(callback)
if self._connect_future is not None:
future = self._connect_future
self._connect_future = None
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
- self._ssl_connect_callback = None
self._server_hostname = None
# If the socket is already connected, attempt to start the handshake.
if not self._verify_cert(self.socket.getpeercert()):
self.close()
return
- self._run_ssl_connect_callback()
+ self._finish_ssl_connect()
- def _run_ssl_connect_callback(self):
- if self._ssl_connect_callback is not None:
- callback = self._ssl_connect_callback
- self._ssl_connect_callback = None
- self._run_callback(callback)
+ def _finish_ssl_connect(self):
if self._ssl_connect_future is not None:
future = self._ssl_connect_future
self._ssl_connect_future = None
return
super(SSLIOStream, self)._handle_write()
- def connect(self, address, callback=None, server_hostname=None):
+ def connect(self, address, server_hostname=None):
self._server_hostname = server_hostname
# Ignore the result of connect(). If it fails,
# wait_for_handshake will raise an error too. This is
# necessary for the old semantics of the connect callback
# (which takes no arguments). In 6.0 this can be refactored to
# be a regular coroutine.
+ # TODO: This is trickier than it looks, since if write()
+ # is called with a connect() pending, we want the connect
+ # to resolve before the write. Or do we care about this?
+ # (There's a test for it, but I think in practice users
+ # either wait for the connect before performing a write or
+ # they don't care about the connect Future at all)
fut = super(SSLIOStream, self).connect(address)
fut.add_done_callback(lambda f: f.exception())
- return self.wait_for_handshake(callback)
+ return self.wait_for_handshake()
def _handle_connect(self):
# Call the superclass method to check for errors.
do_handshake_on_connect=False)
self._add_io_state(old_state)
- def wait_for_handshake(self, callback=None):
+ def wait_for_handshake(self):
"""Wait for the initial SSL handshake to complete.
If a ``callback`` is given, it will be called with no
.. versionadded:: 4.2
- .. deprecated:: 5.1
+ .. versionchanged:: 6.0
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
+ The ``callback`` argument was removed. Use the returned
+ `.Future` instead.
"""
- if (self._ssl_connect_callback is not None or
- self._ssl_connect_future is not None):
+ if self._ssl_connect_future is not None:
raise RuntimeError("Already waiting")
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._ssl_connect_callback = stack_context.wrap(callback)
- future = None
- else:
- future = self._ssl_connect_future = Future()
+ future = self._ssl_connect_future = Future()
if not self._ssl_accepting:
- self._run_ssl_connect_callback()
+ self._finish_ssl_connect()
return future
def write_to_fd(self, data):
from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer
from tornado.httputil import HTTPHeaders
from tornado.locks import Condition, Event
-from tornado.log import gen_log, app_log
+from tornado.log import gen_log
from tornado.netutil import ssl_wrap_socket
-from tornado.stack_context import NullContext
from tornado.tcpserver import TCPServer
from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test # noqa: E501
-from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58,
- ignore_deprecation)
+from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58
from tornado.web import RequestHandler, Application
import errno
import hashlib
first_line = yield stream.read_until(b"\r\n")
self.assertEqual(first_line, b"HTTP/1.1 200 OK\r\n")
# callback=None is equivalent to no callback.
- header_data = yield stream.read_until(b"\r\n\r\n", callback=None)
+ header_data = yield stream.read_until(b"\r\n\r\n")
headers = HTTPHeaders.parse(header_data.decode('latin1'))
content_length = int(headers['Content-Length'])
body = yield stream.read_bytes(content_length)
ws.close()
rs.close()
- @gen_test
- def test_streaming_callback(self):
- rs, ws = yield self.make_iostream_pair()
- try:
- chunks = []
- cond = Condition()
-
- def streaming_callback(data):
- chunks.append(data)
- cond.notify()
-
- with ignore_deprecation():
- fut = rs.read_bytes(6, streaming_callback=streaming_callback)
- ws.write(b"1234")
- while not chunks:
- yield cond.wait()
- ws.write(b"5678")
- final_data = yield(fut)
- self.assertFalse(final_data)
- self.assertEqual(chunks, [b"1234", b"56"])
-
- # the rest of the last chunk is still in the buffer
- data = yield rs.read_bytes(2)
- self.assertEqual(data, b"78")
- finally:
- rs.close()
- ws.close()
-
- @gen_test
- def test_streaming_callback_with_final_callback(self):
- rs, ws = yield self.make_iostream_pair()
- try:
- chunks = []
- final_called = []
- cond = Condition()
-
- def streaming_callback(data):
- chunks.append(data)
- cond.notify()
-
- def final_callback(data):
- self.assertFalse(data)
- final_called.append(True)
- cond.notify()
- with ignore_deprecation():
- rs.read_bytes(6, callback=final_callback,
- streaming_callback=streaming_callback)
- ws.write(b"1234")
- while not chunks:
- yield cond.wait()
- ws.write(b"5678")
- while not final_called:
- yield cond.wait()
- self.assertEqual(chunks, [b"1234", b"56"])
-
- # the rest of the last chunk is still in the buffer
- data = yield rs.read_bytes(2)
- self.assertEqual(data, b"78")
- finally:
- rs.close()
- ws.close()
-
- @gen_test
- def test_streaming_callback_with_data_in_buffer(self):
- rs, ws = yield self.make_iostream_pair()
- ws.write(b"abcd\r\nefgh")
- data = yield rs.read_until(b"\r\n")
- self.assertEqual(data, b"abcd\r\n")
-
- streaming_fut = Future()
- with ignore_deprecation():
- rs.read_until_close(streaming_callback=streaming_fut.set_result)
- data = yield streaming_fut
- self.assertEqual(data, b"efgh")
- rs.close()
- ws.close()
-
- @gen_test
- def test_streaming_until_close(self):
- rs, ws = yield self.make_iostream_pair()
- try:
- chunks = []
- closed = [False]
- cond = Condition()
-
- def streaming_callback(data):
- chunks.append(data)
- cond.notify()
-
- def close_callback(data):
- assert not data, data
- closed[0] = True
- cond.notify()
- with ignore_deprecation():
- rs.read_until_close(callback=close_callback,
- streaming_callback=streaming_callback)
- ws.write(b"1234")
- while len(chunks) != 1:
- yield cond.wait()
- yield ws.write(b"5678")
- ws.close()
- while not closed[0]:
- yield cond.wait()
- self.assertEqual(chunks, [b"1234", b"5678"])
- finally:
- ws.close()
- rs.close()
-
- @gen_test
- def test_streaming_until_close_future(self):
- rs, ws = yield self.make_iostream_pair()
- try:
- chunks = []
-
- @gen.coroutine
- def rs_task():
- with ignore_deprecation():
- yield rs.read_until_close(streaming_callback=chunks.append)
-
- @gen.coroutine
- def ws_task():
- yield ws.write(b"1234")
- yield gen.sleep(0.01)
- yield ws.write(b"5678")
- ws.close()
-
- yield [rs_task(), ws_task()]
- self.assertEqual(chunks, [b"1234", b"5678"])
- finally:
- ws.close()
- rs.close()
-
- @gen_test
- def test_delayed_close_callback(self):
- # The scenario: Server closes the connection while there is a pending
- # read that can be served out of buffered data. The client does not
- # run the close_callback as soon as it detects the close, but rather
- # defers it until after the buffered read has finished.
- rs, ws = yield self.make_iostream_pair()
- try:
- event = Event()
- rs.set_close_callback(event.set)
- ws.write(b"12")
- chunks = []
-
- def callback1(data):
- chunks.append(data)
- with ignore_deprecation():
- rs.read_bytes(1, callback2)
- ws.close()
-
- def callback2(data):
- chunks.append(data)
- with ignore_deprecation():
- rs.read_bytes(1, callback1)
- yield event.wait() # stopped by close_callback
- self.assertEqual(chunks, [b"1", b"2"])
- finally:
- ws.close()
- rs.close()
-
@gen_test
def test_future_delayed_close_callback(self):
# Same as test_delayed_close_callback, but with the future interface.
ws.close()
rs.close()
- @gen_test
- def test_streaming_read_until_close_after_close(self):
- # Same as the preceding test but with a streaming_callback.
- # All data should go through the streaming callback,
- # and the final read callback just gets an empty string.
- rs, ws = yield self.make_iostream_pair()
- try:
- ws.write(b"1234")
- ws.close()
- data = yield rs.read_bytes(1)
- self.assertEqual(data, b"1")
- streaming_data = []
- final_future = Future()
- with ignore_deprecation():
- rs.read_until_close(final_future.set_result,
- streaming_callback=streaming_data.append)
- final_data = yield final_future
- self.assertEqual(b'', final_data)
- self.assertEqual(b''.join(streaming_data), b"234")
- finally:
- ws.close()
- rs.close()
-
@gen_test
def test_large_read_until(self):
# Performance test: read_until used to have a quadratic component
ws.close()
rs.close()
- @gen_test
- def test_read_until_max_bytes_inline_legacy(self):
- rs, ws = yield self.make_iostream_pair()
- closed = Event()
- rs.set_close_callback(closed.set)
- try:
- # Similar to the error case in the previous test, but the
- # ws writes first so rs reads are satisfied
- # inline. For consistency with the out-of-line case, we
- # do not raise the error synchronously.
- ws.write(b"123456")
- with ExpectLog(gen_log, "Unsatisfiable read"):
- with ignore_deprecation():
- rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5)
- yield closed.wait()
- finally:
- ws.close()
- rs.close()
-
@gen_test
def test_read_until_max_bytes_inline(self):
rs, ws = yield self.make_iostream_pair()
listener.close()
raise gen.Return((server_stream, client_stream))
- def test_connection_refused_legacy(self):
- # When a connection is refused, the connect callback should not
- # be run. (The kqueue IOLoop used to behave differently from the
- # epoll IOLoop in this respect)
- cleanup_func, port = refusing_port()
- self.addCleanup(cleanup_func)
- stream = IOStream(socket.socket())
- self.connect_called = False
-
- def connect_callback():
- self.connect_called = True
- self.stop()
- stream.set_close_callback(self.stop)
- # log messages vary by platform and ioloop implementation
- with ExpectLog(gen_log, ".*", required=False):
- with ignore_deprecation():
- stream.connect(("127.0.0.1", port), connect_callback)
- self.wait()
- self.assertFalse(self.connect_called)
- self.assertTrue(isinstance(stream.error, socket.error), stream.error)
- if sys.platform != 'cygwin':
- _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
- if hasattr(errno, "WSAECONNREFUSED"):
- _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
- # cygwin's errnos don't match those used on native windows python
- self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
-
@gen_test
def test_connection_refused(self):
# When a connection is refused, the connect callback should not
yield stream.connect(('localhost', 80))
self.assertTrue(isinstance(stream.error, socket.gaierror))
- @gen_test
- def test_read_callback_error(self):
- # Test that IOStream sets its exc_info when a read callback throws
- server, client = yield self.make_iostream_pair()
- try:
- closed = Event()
- server.set_close_callback(closed.set)
- with ExpectLog(
- app_log, "(Uncaught exception|Exception in callback)"
- ):
- # Clear ExceptionStackContext so IOStream catches error
- with NullContext():
- with ignore_deprecation():
- server.read_bytes(1, callback=lambda data: 1 / 0)
- client.write(b"1")
- yield closed.wait()
- self.assertTrue(isinstance(server.error, ZeroDivisionError))
- finally:
- server.close()
- client.close()
-
@unittest.skipIf(mock is None, 'mock package not present')
@gen_test
def test_read_until_close_with_error(self):
if client is not None:
client.close()
- @gen_test
- def test_wait_for_handshake_callback(self):
- test = self
- handshake_future = Future()
-
- class TestServer(TCPServer):
- def handle_stream(self, stream, address):
- # The handshake has not yet completed.
- test.assertIsNone(stream.socket.cipher())
- self.stream = stream
- with ignore_deprecation():
- stream.wait_for_handshake(self.handshake_done)
-
- def handshake_done(self):
- # Now the handshake is done and ssl information is available.
- test.assertIsNotNone(self.stream.socket.cipher())
- handshake_future.set_result(None)
-
- yield self.connect_to_server(TestServer)
- yield handshake_future
-
@gen_test
def test_wait_for_handshake_future(self):
test = self