From: Ben Darnell Date: Sat, 7 Jul 2018 04:13:36 +0000 (-0400) Subject: iostream: Remove deprecated interfaces X-Git-Tag: v6.0.0b1~48^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a55c7fe7;p=thirdparty%2Ftornado.git iostream: Remove deprecated interfaces This is the riskiest change in this series since it does some internal refactoring to get rid of some callback-related machinery. --- diff --git a/tornado/iostream.py b/tornado/iostream.py index 89e1e234e..7d627483e 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -31,9 +31,9 @@ import io import numbers import os import socket +import ssl import sys import re -import warnings from tornado.concurrent import Future from tornado import ioloop @@ -47,12 +47,6 @@ try: except ImportError: _set_nonblocking = None -try: - import ssl -except ImportError: - # ssl is not available on Google App Engine - ssl = None - # These errnos indicate that a non-blocking operation must be retried # at a later time. On most platforms they're the same value, but on # some they differ. @@ -211,20 +205,19 @@ class _StreamBuffer(object): class BaseIOStream(object): """A utility class to write to and read from a non-blocking file or socket. - We support a non-blocking ``write()`` and a family of ``read_*()`` methods. - All of the methods take an optional ``callback`` argument and return a - `.Future` only if no callback is given. When the operation completes, - the callback will be run or the `.Future` will resolve with the data - read (or ``None`` for ``write()``). All outstanding ``Futures`` will - resolve with a `StreamClosedError` when the stream is closed; users - of the callback interface will be notified via - `.BaseIOStream.set_close_callback` instead. + We support a non-blocking ``write()`` and a family of ``read_*()`` + methods. When the operation completes, the `.Future` will resolve + with the data read (or ``None`` for ``write()``). All outstanding + ``Futures`` will resolve with a `StreamClosedError` when the + stream is closed; `.BaseIOStream.set_close_callback` can also be used + to be notified of a closed stream. When a stream is closed due to an error, the IOStream's ``error`` attribute contains the exception object. Subclasses must implement `fileno`, `close_fd`, `write_to_fd`, `read_from_fd`, and optionally `get_fd_error`. + """ def __init__(self, max_buffer_size=None, read_chunk_size=None, max_write_buffer_size=None): @@ -266,22 +259,17 @@ class BaseIOStream(object): self._read_bytes = None self._read_partial = False self._read_until_close = False - self._read_callback = None self._read_future = None - self._streaming_callback = None - self._write_callback = None self._write_futures = collections.deque() self._close_callback = None - self._connect_callback = None self._connect_future = None # _ssl_connect_future should be defined in SSLIOStream - # but it's here so we can clean it up in maybe_run_close_callback. + # but it's here so we can clean it up in _signal_closed # TODO: refactor that so subclasses can add additional futures # to be cancelled. self._ssl_connect_future = None self._connecting = False self._state = None - self._pending_callbacks = 0 self._closed = False def fileno(self): @@ -328,13 +316,11 @@ class BaseIOStream(object): """ return None - def read_until_regex(self, regex, callback=None, max_bytes=None): + def read_until_regex(self, regex, max_bytes=None): """Asynchronously read until we have matched the given regex. The result includes the data that matches the regex and anything - that came before it. If a callback is given, it will be run - with the data as an argument; if not, this method returns a - `.Future`. + that came before it. If ``max_bytes`` is not None, the connection will be closed if more than ``max_bytes`` bytes have been read and the regex is @@ -344,13 +330,13 @@ class BaseIOStream(object): Added the ``max_bytes`` argument. The ``callback`` argument is now optional and a `.Future` will be returned if it is omitted. - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` argument is deprecated and will be removed - in Tornado 6.0. Use the returned `.Future` instead. + The ``callback`` argument was removed. Use the returned + `.Future` instead. """ - future = self._set_read_callback(callback) + future = self._start_read() self._read_regex = re.compile(regex) self._read_max_bytes = max_bytes try: @@ -361,19 +347,16 @@ class BaseIOStream(object): self.close(exc_info=e) return future except: - if future is not None: - # Ensure that the future doesn't log an error because its - # failure was never examined. - future.add_done_callback(lambda f: f.exception()) + # Ensure that the future doesn't log an error because its + # failure was never examined. + future.add_done_callback(lambda f: f.exception()) raise return future - def read_until(self, delimiter, callback=None, max_bytes=None): + def read_until(self, delimiter, max_bytes=None): """Asynchronously read until we have found the given delimiter. The result includes all the data read including the delimiter. - If a callback is given, it will be run with the data as an argument; - if not, this method returns a `.Future`. If ``max_bytes`` is not None, the connection will be closed if more than ``max_bytes`` bytes have been read and the delimiter @@ -383,12 +366,12 @@ class BaseIOStream(object): Added the ``max_bytes`` argument. The ``callback`` argument is now optional and a `.Future` will be returned if it is omitted. - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` argument is deprecated and will be removed - in Tornado 6.0. Use the returned `.Future` instead. + The ``callback`` argument was removed. Use the returned + `.Future` instead. """ - future = self._set_read_callback(callback) + future = self._start_read() self._read_delimiter = delimiter self._read_max_bytes = max_bytes try: @@ -399,58 +382,42 @@ class BaseIOStream(object): self.close(exc_info=e) return future except: - if future is not None: - future.add_done_callback(lambda f: f.exception()) + future.add_done_callback(lambda f: f.exception()) raise return future - def read_bytes(self, num_bytes, callback=None, streaming_callback=None, - partial=False): + def read_bytes(self, num_bytes, partial=False): """Asynchronously read a number of bytes. - If a ``streaming_callback`` is given, it will be called with chunks - of data as they become available, and the final result will be empty. - Otherwise, the result is all the data that was read. - If a callback is given, it will be run with the data as an argument; - if not, this method returns a `.Future`. - - If ``partial`` is true, the callback is run as soon as we have + If ``partial`` is true, data is returned as soon as we have any bytes to return (but never more than ``num_bytes``) .. versionchanged:: 4.0 Added the ``partial`` argument. The callback argument is now optional and a `.Future` will be returned if it is omitted. - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` and ``streaming_callback`` arguments are - deprecated and will be removed in Tornado 6.0. Use the - returned `.Future` (and ``partial=True`` for - ``streaming_callback``) instead. + The ``callback`` and ``streaming_callback`` arguments have + been removed. Use the returned `.Future` (and + ``partial=True`` for ``streaming_callback``) instead. """ - future = self._set_read_callback(callback) + future = self._start_read() assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._read_partial = partial - if streaming_callback is not None: - warnings.warn("streaming_callback is deprecated, use partial instead", - DeprecationWarning) - self._streaming_callback = stack_context.wrap(streaming_callback) try: self._try_inline_read() except: - if future is not None: - future.add_done_callback(lambda f: f.exception()) + future.add_done_callback(lambda f: f.exception()) raise return future - def read_into(self, buf, callback=None, partial=False): + def read_into(self, buf, partial=False): """Asynchronously read a number of bytes. ``buf`` must be a writable buffer into which data will be read. - If a callback is given, it will be run with the number of read - bytes as an argument; if not, this method returns a `.Future`. If ``partial`` is true, the callback is run as soon as any bytes have been read. Otherwise, it is run when the ``buf`` has been @@ -458,13 +425,13 @@ class BaseIOStream(object): .. versionadded:: 5.0 - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` argument is deprecated and will be removed - in Tornado 6.0. Use the returned `.Future` instead. + The ``callback`` argument was removed. Use the returned + `.Future` instead. """ - future = self._set_read_callback(callback) + future = self._start_read() # First copy data already in read buffer available_bytes = self._read_buffer_size @@ -490,68 +457,45 @@ class BaseIOStream(object): try: self._try_inline_read() except: - if future is not None: - future.add_done_callback(lambda f: f.exception()) + future.add_done_callback(lambda f: f.exception()) raise return future - def read_until_close(self, callback=None, streaming_callback=None): + def read_until_close(self): """Asynchronously reads all data from the socket until it is closed. - If a ``streaming_callback`` is given, it will be called with chunks - of data as they become available, and the final result will be empty. - Otherwise, the result is all the data that was read. - If a callback is given, it will be run with the data as an argument; - if not, this method returns a `.Future`. - - Note that if a ``streaming_callback`` is used, data will be - read from the socket as quickly as it becomes available; there - is no way to apply backpressure or cancel the reads. If flow - control or cancellation are desired, use a loop with - `read_bytes(partial=True) <.read_bytes>` instead. + This will buffer all available data until ``max_buffer_size`` + is reached. If flow control or cancellation are desired, use a + loop with `read_bytes(partial=True) <.read_bytes>` instead. .. versionchanged:: 4.0 The callback argument is now optional and a `.Future` will be returned if it is omitted. - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` and ``streaming_callback`` arguments are - deprecated and will be removed in Tornado 6.0. Use the - returned `.Future` (and `read_bytes` with ``partial=True`` - for ``streaming_callback``) instead. + The ``callback`` and ``streaming_callback`` arguments have + been removed. Use the returned `.Future` (and `read_bytes` + with ``partial=True`` for ``streaming_callback``) instead. """ - future = self._set_read_callback(callback) - if streaming_callback is not None: - warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead", - DeprecationWarning) - self._streaming_callback = stack_context.wrap(streaming_callback) + future = self._start_read() if self.closed(): - if self._streaming_callback is not None: - self._run_read_callback(self._read_buffer_size, True) - self._run_read_callback(self._read_buffer_size, False) + self._finish_read(self._read_buffer_size, False) return future self._read_until_close = True try: self._try_inline_read() except: - if future is not None: - future.add_done_callback(lambda f: f.exception()) + future.add_done_callback(lambda f: f.exception()) raise return future - def write(self, data, callback=None): + def write(self, data): """Asynchronously write the given data to this stream. - If ``callback`` is given, we call it when all of the buffered write - data has been successfully written to the stream. If there was - previously buffered write data and an old write callback, that - callback is simply overwritten with this new callback. - - If no ``callback`` is given, this method returns a `.Future` that - resolves (with a result of ``None``) when the write has been - completed. + This method returns a `.Future` that resolves (with a result + of ``None``) when the write has been completed. The ``data`` argument may be of type `bytes` or `memoryview`. @@ -561,10 +505,10 @@ class BaseIOStream(object): .. versionchanged:: 4.5 Added support for `memoryview` arguments. - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` argument is deprecated and will be removed - in Tornado 6.0. Use the returned `.Future` instead. + The ``callback`` argument was removed. Use the returned + `.Future` instead. """ self._check_closed() @@ -574,15 +518,9 @@ class BaseIOStream(object): raise StreamBufferFullError("Reached maximum write buffer size") self._write_buffer.append(data) self._total_write_index += len(data) - if callback is not None: - warnings.warn("callback argument is deprecated, use returned Future instead", - DeprecationWarning) - self._write_callback = stack_context.wrap(callback) - future = None - else: - future = Future() - future.add_done_callback(lambda f: f.exception()) - self._write_futures.append((self._total_write_index, future)) + future = Future() + future.add_done_callback(lambda f: f.exception()) + self._write_futures.append((self._total_write_index, future)) if not self._connecting: self._handle_write() if self._write_buffer: @@ -600,7 +538,7 @@ class BaseIOStream(object): closed while no other read or write is in progress. Unlike other callback-based interfaces, ``set_close_callback`` - will not be removed in Tornado 6.0. + was not removed in Tornado 6.0. """ self._close_callback = stack_context.wrap(callback) self._maybe_add_error_listener() @@ -623,51 +561,46 @@ class BaseIOStream(object): if any(exc_info): self.error = exc_info[1] if self._read_until_close: - if (self._streaming_callback is not None and - self._read_buffer_size): - self._run_read_callback(self._read_buffer_size, True) self._read_until_close = False - self._run_read_callback(self._read_buffer_size, False) + self._finish_read(self._read_buffer_size, False) if self._state is not None: self.io_loop.remove_handler(self.fileno()) self._state = None self.close_fd() self._closed = True - self._maybe_run_close_callback() - - def _maybe_run_close_callback(self): - # If there are pending callbacks, don't run the close callback - # until they're done (see _maybe_add_error_handler) - if self.closed() and self._pending_callbacks == 0: - futures = [] - if self._read_future is not None: - futures.append(self._read_future) - self._read_future = None - futures += [future for _, future in self._write_futures] - self._write_futures.clear() - if self._connect_future is not None: - futures.append(self._connect_future) - self._connect_future = None - if self._ssl_connect_future is not None: - futures.append(self._ssl_connect_future) - self._ssl_connect_future = None - for future in futures: - future.set_exception(StreamClosedError(real_error=self.error)) - future.exception() - if self._close_callback is not None: - cb = self._close_callback - self._close_callback = None - self._run_callback(cb) - # Delete any unfinished callbacks to break up reference cycles. - self._read_callback = self._write_callback = None - # Clear the buffers so they can be cleared immediately even - # if the IOStream object is kept alive by a reference cycle. - # TODO: Clear the read buffer too; it currently breaks some tests. - self._write_buffer = None + self._signal_closed() + + def _signal_closed(self): + futures = [] + if self._read_future is not None: + futures.append(self._read_future) + self._read_future = None + futures += [future for _, future in self._write_futures] + self._write_futures.clear() + if self._connect_future is not None: + futures.append(self._connect_future) + self._connect_future = None + for future in futures: + future.set_exception(StreamClosedError(real_error=self.error)) + future.exception() + if self._ssl_connect_future is not None: + # _ssl_connect_future expects to see the real exception (typically + # an ssl.SSLError), not just StreamClosedError. + self._ssl_connect_future.set_exception(self.error) + self._ssl_connect_future.exception() + self._ssl_connect_future = None + if self._close_callback is not None: + cb = self._close_callback + self._close_callback = None + self.io_loop.add_callback(cb) + # Clear the buffers so they can be cleared immediately even + # if the IOStream object is kept alive by a reference cycle. + # TODO: Clear the read buffer too; it currently breaks some tests. + self._write_buffer = None def reading(self): """Returns true if we are currently reading from the stream.""" - return self._read_callback is not None or self._read_future is not None + return self._read_future is not None def writing(self): """Returns true if we are currently writing to the stream.""" @@ -745,100 +678,47 @@ class BaseIOStream(object): self.close(exc_info=e) raise - def _run_callback(self, callback, *args): - def wrapper(): - self._pending_callbacks -= 1 - try: - return callback(*args) - except Exception as e: - app_log.error("Uncaught exception, closing connection.", - exc_info=True) - # Close the socket on an uncaught exception from a user callback - # (It would eventually get closed when the socket object is - # gc'd, but we don't want to rely on gc happening before we - # run out of file descriptors) - self.close(exc_info=e) - # Re-raise the exception so that IOLoop.handle_callback_exception - # can see it and log the error - raise - finally: - self._maybe_add_error_listener() - # We schedule callbacks to be run on the next IOLoop iteration - # rather than running them directly for several reasons: - # * Prevents unbounded stack growth when a callback calls an - # IOLoop operation that immediately runs another callback - # * Provides a predictable execution context for e.g. - # non-reentrant mutexes - # * Ensures that the try/except in wrapper() is run outside - # of the application's StackContexts - with stack_context.NullContext(): - # stack_context was already captured in callback, we don't need to - # capture it again for IOStream's wrapper. This is especially - # important if the callback was pre-wrapped before entry to - # IOStream (as in HTTPConnection._header_callback), as we could - # capture and leak the wrong context here. - self._pending_callbacks += 1 - self.io_loop.add_callback(wrapper) - def _read_to_buffer_loop(self): # This method is called from _handle_read and _try_inline_read. - try: - if self._read_bytes is not None: - target_bytes = self._read_bytes - elif self._read_max_bytes is not None: - target_bytes = self._read_max_bytes - elif self.reading(): - # For read_until without max_bytes, or - # read_until_close, read as much as we can before - # scanning for the delimiter. - target_bytes = None - else: - target_bytes = 0 - next_find_pos = 0 - # Pretend to have a pending callback so that an EOF in - # _read_to_buffer doesn't trigger an immediate close - # callback. At the end of this method we'll either - # establish a real pending callback via - # _read_from_buffer or run the close callback. - # - # We need two try statements here so that - # pending_callbacks is decremented before the `except` - # clause below (which calls `close` and does need to - # trigger the callback) - self._pending_callbacks += 1 - while not self.closed(): - # Read from the socket until we get EWOULDBLOCK or equivalent. - # SSL sockets do some internal buffering, and if the data is - # sitting in the SSL object's buffer select() and friends - # can't see it; the only way to find out if it's there is to - # try to read it. - if self._read_to_buffer() == 0: - break + if self._read_bytes is not None: + target_bytes = self._read_bytes + elif self._read_max_bytes is not None: + target_bytes = self._read_max_bytes + elif self.reading(): + # For read_until without max_bytes, or + # read_until_close, read as much as we can before + # scanning for the delimiter. + target_bytes = None + else: + target_bytes = 0 + next_find_pos = 0 + while not self.closed(): + # Read from the socket until we get EWOULDBLOCK or equivalent. + # SSL sockets do some internal buffering, and if the data is + # sitting in the SSL object's buffer select() and friends + # can't see it; the only way to find out if it's there is to + # try to read it. + if self._read_to_buffer() == 0: + break - self._run_streaming_callback() + # If we've read all the bytes we can use, break out of + # this loop. - # If we've read all the bytes we can use, break out of - # this loop. We can't just call read_from_buffer here - # because of subtle interactions with the - # pending_callback and error_listener mechanisms. - # - # If we've reached target_bytes, we know we're done. - if (target_bytes is not None and - self._read_buffer_size >= target_bytes): - break + # If we've reached target_bytes, we know we're done. + if (target_bytes is not None and + self._read_buffer_size >= target_bytes): + break - # Otherwise, we need to call the more expensive find_read_pos. - # It's inefficient to do this on every read, so instead - # do it on the first read and whenever the read buffer - # size has doubled. - if self._read_buffer_size >= next_find_pos: - pos = self._find_read_pos() - if pos is not None: - return pos - next_find_pos = self._read_buffer_size * 2 - return self._find_read_pos() - finally: - self._pending_callbacks -= 1 + # Otherwise, we need to call the more expensive find_read_pos. + # It's inefficient to do this on every read, so instead + # do it on the first read and whenever the read buffer + # size has doubled. + if self._read_buffer_size >= next_find_pos: + pos = self._find_read_pos() + if pos is not None: + return pos + next_find_pos = self._read_buffer_size * 2 + return self._find_read_pos() def _handle_read(self): try: @@ -851,22 +731,13 @@ class BaseIOStream(object): return if pos is not None: self._read_from_buffer(pos) - return - else: - self._maybe_run_close_callback() - def _set_read_callback(self, callback): - assert self._read_callback is None, "Already reading" + def _start_read(self): assert self._read_future is None, "Already reading" - if callback is not None: - warnings.warn("callbacks are deprecated, use returned Future instead", - DeprecationWarning) - self._read_callback = stack_context.wrap(callback) - else: - self._read_future = Future() + self._read_future = Future() return self._read_future - def _run_read_callback(self, size, streaming): + def _finish_read(self, size, streaming): if self._user_read_buffer: self._read_buffer = self._after_user_read_buffer or bytearray() self._after_user_read_buffer = None @@ -876,24 +747,11 @@ class BaseIOStream(object): result = size else: result = self._consume(size) - if streaming: - callback = self._streaming_callback - else: - callback = self._read_callback - self._read_callback = self._streaming_callback = None - if self._read_future is not None: - assert callback is None - future = self._read_future - self._read_future = None - - future.set_result(result) - if callback is not None: - assert (self._read_future is None) or streaming - self._run_callback(callback, result) - else: - # If we scheduled a callback, we will add the error listener - # afterwards. If we didn't, we have to do it now. - self._maybe_add_error_listener() + if self._read_future is not None: + future = self._read_future + self._read_future = None + future.set_result(result) + self._maybe_add_error_listener() def _try_inline_read(self): """Attempt to complete the current read operation from buffered data. @@ -903,29 +761,18 @@ class BaseIOStream(object): listening for reads on the socket. """ # See if we've already got the data from a previous read - self._run_streaming_callback() pos = self._find_read_pos() if pos is not None: self._read_from_buffer(pos) return self._check_closed() - try: - pos = self._read_to_buffer_loop() - except Exception: - # If there was an in _read_to_buffer, we called close() already, - # but couldn't run the close callback because of _pending_callbacks. - # Before we escape from this function, run the close callback if - # applicable. - self._maybe_run_close_callback() - raise + pos = self._read_to_buffer_loop() if pos is not None: self._read_from_buffer(pos) return - # We couldn't satisfy the read inline, so either close the stream - # or listen for new data. - if self.closed(): - self._maybe_run_close_callback() - else: + # We couldn't satisfy the read inline, so make sure we're + # listening for new data unless the stream is closed. + if not self.closed(): self._add_io_state(ioloop.IOLoop.READ) def _read_to_buffer(self): @@ -974,14 +821,6 @@ class BaseIOStream(object): raise StreamBufferFullError("Reached maximum read buffer size") return bytes_read - def _run_streaming_callback(self): - if self._streaming_callback is not None and self._read_buffer_size: - bytes_to_consume = self._read_buffer_size - if self._read_bytes is not None: - bytes_to_consume = min(self._read_bytes, bytes_to_consume) - self._read_bytes -= bytes_to_consume - self._run_read_callback(bytes_to_consume, True) - def _read_from_buffer(self, pos): """Attempts to complete the currently-pending read from the buffer. @@ -990,7 +829,7 @@ class BaseIOStream(object): """ self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False - self._run_read_callback(pos, False) + self._finish_read(pos, False) def _find_read_pos(self): """Attempts to find a position in the read buffer that satisfies @@ -1082,12 +921,6 @@ class BaseIOStream(object): self._write_futures.popleft() future.set_result(None) - if not len(self._write_buffer): - if self._write_callback: - callback = self._write_callback - self._write_callback = None - self._run_callback(callback) - def _consume(self, loc): # Consume loc bytes from the read buffer and return them if loc == 0: @@ -1118,13 +951,10 @@ class BaseIOStream(object): # connection is first established because we are going to read or write # immediately anyway. Instead, we insert checks at various times to # see if the connection is idle and add the read listener then. - if self._pending_callbacks != 0: - return if self._state is None or self._state == ioloop.IOLoop.ERROR: - if self.closed(): - self._maybe_run_close_callback() - elif (self._read_buffer_size == 0 and - self._close_callback is not None): + if (not self.closed() and + self._read_buffer_size == 0 and + self._close_callback is not None): self._add_io_state(ioloop.IOLoop.READ) def _add_io_state(self, state): @@ -1133,20 +963,18 @@ class BaseIOStream(object): Implementation notes: Reads and writes have a fast path and a slow path. The fast path reads synchronously from socket buffers, while the slow path uses `_add_io_state` to schedule - an IOLoop callback. Note that in both cases, the callback is - run asynchronously with `_run_callback`. + an IOLoop callback. To detect closed connections, we must have called `_add_io_state` at some point, but we want to delay this as much as possible so we don't have to set an `IOLoop.ERROR` listener that will be overwritten by the next slow-path - operation. As long as there are callbacks scheduled for - fast-path ops, those callbacks may do more reads. - If a sequence of fast-path ops do not end in a slow-path op, - (e.g. for an @asynchronous long-poll request), we must add - the error handler. This is done in `_run_callback` and `write` - (since the write callback is optional so we can have a - fast-path write with no `_run_callback`) + operation. If a sequence of fast-path ops do not end in a + slow-path op, (e.g. for an @asynchronous long-poll request), + we must add the error handler. + + TODO: reevaluate this now that callbacks are gone. + """ if self.closed(): # connection has been closed, so there can be no future events @@ -1252,7 +1080,7 @@ class IOStream(BaseIOStream): # See https://github.com/tornadoweb/tornado/pull/2008 del data - def connect(self, address, callback=None, server_hostname=None): + def connect(self, address, server_hostname=None): """Connects the socket to a remote address without blocking. May only be called if the socket passed to the constructor was @@ -1291,20 +1119,14 @@ class IOStream(BaseIOStream): suitably-configured `ssl.SSLContext` to the `SSLIOStream` constructor to disable. - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` argument is deprecated and will be removed - in Tornado 6.0. Use the returned `.Future` instead. + The ``callback`` argument was removed. Use the returned + `.Future` instead. """ self._connecting = True - if callback is not None: - warnings.warn("callback argument is deprecated, use returned Future instead", - DeprecationWarning) - self._connect_callback = stack_context.wrap(callback) - future = None - else: - future = self._connect_future = Future() + future = self._connect_future = Future() try: self.socket.connect(address) except socket.error as e: @@ -1360,10 +1182,10 @@ class IOStream(BaseIOStream): ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a suitably-configured `ssl.SSLContext` to disable. """ - if (self._read_callback or self._read_future or - self._write_callback or self._write_futures or - self._connect_callback or self._connect_future or - self._pending_callbacks or self._closed or + if (self._read_future or + self._write_futures or + self._connect_future or + self._closed or self._read_buffer or self._write_buffer): raise ValueError("IOStream is not idle; cannot convert to SSL") if ssl_options is None: @@ -1384,27 +1206,8 @@ class IOStream(BaseIOStream): future = Future() ssl_stream = SSLIOStream(socket, ssl_options=ssl_options) - # Wrap the original close callback so we can fail our Future as well. - # If we had an "unwrap" counterpart to this method we would need - # to restore the original callback after our Future resolves - # so that repeated wrap/unwrap calls don't build up layers. - - def close_callback(): - if not future.done(): - # Note that unlike most Futures returned by IOStream, - # this one passes the underlying error through directly - # instead of wrapping everything in a StreamClosedError - # with a real_error attribute. This is because once the - # connection is established it's more helpful to raise - # the SSLError directly than to hide it behind a - # StreamClosedError (and the client is expecting SSL - # issues rather than network issues since this method is - # named start_tls). - future.set_exception(ssl_stream.error or StreamClosedError()) - if orig_close_callback is not None: - orig_close_callback() - ssl_stream.set_close_callback(close_callback) - ssl_stream._ssl_connect_callback = lambda: future.set_result(ssl_stream) + ssl_stream.set_close_callback(orig_close_callback) + ssl_stream._ssl_connect_future = future ssl_stream.max_buffer_size = self.max_buffer_size ssl_stream.read_chunk_size = self.read_chunk_size return future @@ -1428,10 +1231,6 @@ class IOStream(BaseIOStream): self.socket.fileno(), errno.errorcode[err]) self.close() return - if self._connect_callback is not None: - callback = self._connect_callback - self._connect_callback = None - self._run_callback(callback) if self._connect_future is not None: future = self._connect_future self._connect_future = None @@ -1473,7 +1272,6 @@ class SSLIOStream(IOStream): self._ssl_accepting = True self._handshake_reading = False self._handshake_writing = False - self._ssl_connect_callback = None self._server_hostname = None # If the socket is already connected, attempt to start the handshake. @@ -1537,13 +1335,9 @@ class SSLIOStream(IOStream): if not self._verify_cert(self.socket.getpeercert()): self.close() return - self._run_ssl_connect_callback() + self._finish_ssl_connect() - def _run_ssl_connect_callback(self): - if self._ssl_connect_callback is not None: - callback = self._ssl_connect_callback - self._ssl_connect_callback = None - self._run_callback(callback) + def _finish_ssl_connect(self): if self._ssl_connect_future is not None: future = self._ssl_connect_future self._ssl_connect_future = None @@ -1588,16 +1382,22 @@ class SSLIOStream(IOStream): return super(SSLIOStream, self)._handle_write() - def connect(self, address, callback=None, server_hostname=None): + def connect(self, address, server_hostname=None): self._server_hostname = server_hostname # Ignore the result of connect(). If it fails, # wait_for_handshake will raise an error too. This is # necessary for the old semantics of the connect callback # (which takes no arguments). In 6.0 this can be refactored to # be a regular coroutine. + # TODO: This is trickier than it looks, since if write() + # is called with a connect() pending, we want the connect + # to resolve before the write. Or do we care about this? + # (There's a test for it, but I think in practice users + # either wait for the connect before performing a write or + # they don't care about the connect Future at all) fut = super(SSLIOStream, self).connect(address) fut.add_done_callback(lambda f: f.exception()) - return self.wait_for_handshake(callback) + return self.wait_for_handshake() def _handle_connect(self): # Call the superclass method to check for errors. @@ -1622,7 +1422,7 @@ class SSLIOStream(IOStream): do_handshake_on_connect=False) self._add_io_state(old_state) - def wait_for_handshake(self, callback=None): + def wait_for_handshake(self): """Wait for the initial SSL handshake to complete. If a ``callback`` is given, it will be called with no @@ -1641,24 +1441,17 @@ class SSLIOStream(IOStream): .. versionadded:: 4.2 - .. deprecated:: 5.1 + .. versionchanged:: 6.0 - The ``callback`` argument is deprecated and will be removed - in Tornado 6.0. Use the returned `.Future` instead. + The ``callback`` argument was removed. Use the returned + `.Future` instead. """ - if (self._ssl_connect_callback is not None or - self._ssl_connect_future is not None): + if self._ssl_connect_future is not None: raise RuntimeError("Already waiting") - if callback is not None: - warnings.warn("callback argument is deprecated, use returned Future instead", - DeprecationWarning) - self._ssl_connect_callback = stack_context.wrap(callback) - future = None - else: - future = self._ssl_connect_future = Future() + future = self._ssl_connect_future = Future() if not self._ssl_accepting: - self._run_ssl_connect_callback() + self._finish_ssl_connect() return future def write_to_fd(self, data): diff --git a/tornado/test/concurrent_test.py b/tornado/test/concurrent_test.py index 6bdecf544..99402d167 100644 --- a/tornado/test/concurrent_test.py +++ b/tornado/test/concurrent_test.py @@ -85,33 +85,6 @@ class BaseCapClient(object): raise CapError(message) -class ManualCapClient(BaseCapClient): - def capitalize(self, request_data, callback=None): - logging.debug("capitalize") - self.request_data = request_data - self.stream = IOStream(socket.socket()) - self.stream.connect(('127.0.0.1', self.port), - callback=self.handle_connect) - self.future = Future() - if callback is not None: - self.future.add_done_callback( - stack_context.wrap(lambda future: callback(future.result()))) - return self.future - - def handle_connect(self): - logging.debug("handle_connect") - self.stream.write(utf8(self.request_data + "\n")) - self.stream.read_until(b'\n', callback=self.handle_read) - - def handle_read(self, data): - logging.debug("handle_read") - self.stream.close() - try: - self.future.set_result(self.process_response(data)) - except CapError as e: - self.future.set_exception(e) - - class GeneratorCapClient(BaseCapClient): @gen.coroutine def capitalize(self, request_data): @@ -166,20 +139,6 @@ class ClientTestMixin(object): self.io_loop.run_sync(f) -class ManualClientTest(ClientTestMixin, AsyncTestCase): - client_class = ManualCapClient - - def setUp(self): - self.warning_catcher = warnings.catch_warnings() - self.warning_catcher.__enter__() - warnings.simplefilter('ignore', DeprecationWarning) - super(ManualClientTest, self).setUp() - - def tearDown(self): - super(ManualClientTest, self).tearDown() - self.warning_catcher.__exit__(None, None, None) - - class GeneratorClientTest(ClientTestMixin, AsyncTestCase): client_class = GeneratorCapClient diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index 51fbb02a9..c039f4183 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -5,13 +5,11 @@ from tornado import netutil from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer from tornado.httputil import HTTPHeaders from tornado.locks import Condition, Event -from tornado.log import gen_log, app_log +from tornado.log import gen_log from tornado.netutil import ssl_wrap_socket -from tornado.stack_context import NullContext from tornado.tcpserver import TCPServer from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test # noqa: E501 -from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58, - ignore_deprecation) +from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58 from tornado.web import RequestHandler, Application import errno import hashlib @@ -123,7 +121,7 @@ class TestIOStreamWebMixin(object): first_line = yield stream.read_until(b"\r\n") self.assertEqual(first_line, b"HTTP/1.1 200 OK\r\n") # callback=None is equivalent to no callback. - header_data = yield stream.read_until(b"\r\n\r\n", callback=None) + header_data = yield stream.read_until(b"\r\n\r\n") headers = HTTPHeaders.parse(header_data.decode('latin1')) content_length = int(headers['Content-Length']) body = yield stream.read_bytes(content_length) @@ -171,167 +169,6 @@ class TestReadWriteMixin(object): ws.close() rs.close() - @gen_test - def test_streaming_callback(self): - rs, ws = yield self.make_iostream_pair() - try: - chunks = [] - cond = Condition() - - def streaming_callback(data): - chunks.append(data) - cond.notify() - - with ignore_deprecation(): - fut = rs.read_bytes(6, streaming_callback=streaming_callback) - ws.write(b"1234") - while not chunks: - yield cond.wait() - ws.write(b"5678") - final_data = yield(fut) - self.assertFalse(final_data) - self.assertEqual(chunks, [b"1234", b"56"]) - - # the rest of the last chunk is still in the buffer - data = yield rs.read_bytes(2) - self.assertEqual(data, b"78") - finally: - rs.close() - ws.close() - - @gen_test - def test_streaming_callback_with_final_callback(self): - rs, ws = yield self.make_iostream_pair() - try: - chunks = [] - final_called = [] - cond = Condition() - - def streaming_callback(data): - chunks.append(data) - cond.notify() - - def final_callback(data): - self.assertFalse(data) - final_called.append(True) - cond.notify() - with ignore_deprecation(): - rs.read_bytes(6, callback=final_callback, - streaming_callback=streaming_callback) - ws.write(b"1234") - while not chunks: - yield cond.wait() - ws.write(b"5678") - while not final_called: - yield cond.wait() - self.assertEqual(chunks, [b"1234", b"56"]) - - # the rest of the last chunk is still in the buffer - data = yield rs.read_bytes(2) - self.assertEqual(data, b"78") - finally: - rs.close() - ws.close() - - @gen_test - def test_streaming_callback_with_data_in_buffer(self): - rs, ws = yield self.make_iostream_pair() - ws.write(b"abcd\r\nefgh") - data = yield rs.read_until(b"\r\n") - self.assertEqual(data, b"abcd\r\n") - - streaming_fut = Future() - with ignore_deprecation(): - rs.read_until_close(streaming_callback=streaming_fut.set_result) - data = yield streaming_fut - self.assertEqual(data, b"efgh") - rs.close() - ws.close() - - @gen_test - def test_streaming_until_close(self): - rs, ws = yield self.make_iostream_pair() - try: - chunks = [] - closed = [False] - cond = Condition() - - def streaming_callback(data): - chunks.append(data) - cond.notify() - - def close_callback(data): - assert not data, data - closed[0] = True - cond.notify() - with ignore_deprecation(): - rs.read_until_close(callback=close_callback, - streaming_callback=streaming_callback) - ws.write(b"1234") - while len(chunks) != 1: - yield cond.wait() - yield ws.write(b"5678") - ws.close() - while not closed[0]: - yield cond.wait() - self.assertEqual(chunks, [b"1234", b"5678"]) - finally: - ws.close() - rs.close() - - @gen_test - def test_streaming_until_close_future(self): - rs, ws = yield self.make_iostream_pair() - try: - chunks = [] - - @gen.coroutine - def rs_task(): - with ignore_deprecation(): - yield rs.read_until_close(streaming_callback=chunks.append) - - @gen.coroutine - def ws_task(): - yield ws.write(b"1234") - yield gen.sleep(0.01) - yield ws.write(b"5678") - ws.close() - - yield [rs_task(), ws_task()] - self.assertEqual(chunks, [b"1234", b"5678"]) - finally: - ws.close() - rs.close() - - @gen_test - def test_delayed_close_callback(self): - # The scenario: Server closes the connection while there is a pending - # read that can be served out of buffered data. The client does not - # run the close_callback as soon as it detects the close, but rather - # defers it until after the buffered read has finished. - rs, ws = yield self.make_iostream_pair() - try: - event = Event() - rs.set_close_callback(event.set) - ws.write(b"12") - chunks = [] - - def callback1(data): - chunks.append(data) - with ignore_deprecation(): - rs.read_bytes(1, callback2) - ws.close() - - def callback2(data): - chunks.append(data) - with ignore_deprecation(): - rs.read_bytes(1, callback1) - yield event.wait() # stopped by close_callback - self.assertEqual(chunks, [b"1", b"2"]) - finally: - ws.close() - rs.close() - @gen_test def test_future_delayed_close_callback(self): # Same as test_delayed_close_callback, but with the future interface. @@ -393,29 +230,6 @@ class TestReadWriteMixin(object): ws.close() rs.close() - @gen_test - def test_streaming_read_until_close_after_close(self): - # Same as the preceding test but with a streaming_callback. - # All data should go through the streaming callback, - # and the final read callback just gets an empty string. - rs, ws = yield self.make_iostream_pair() - try: - ws.write(b"1234") - ws.close() - data = yield rs.read_bytes(1) - self.assertEqual(data, b"1") - streaming_data = [] - final_future = Future() - with ignore_deprecation(): - rs.read_until_close(final_future.set_result, - streaming_callback=streaming_data.append) - final_data = yield final_future - self.assertEqual(b'', final_data) - self.assertEqual(b''.join(streaming_data), b"234") - finally: - ws.close() - rs.close() - @gen_test def test_large_read_until(self): # Performance test: read_until used to have a quadratic component @@ -553,25 +367,6 @@ class TestReadWriteMixin(object): ws.close() rs.close() - @gen_test - def test_read_until_max_bytes_inline_legacy(self): - rs, ws = yield self.make_iostream_pair() - closed = Event() - rs.set_close_callback(closed.set) - try: - # Similar to the error case in the previous test, but the - # ws writes first so rs reads are satisfied - # inline. For consistency with the out-of-line case, we - # do not raise the error synchronously. - ws.write(b"123456") - with ExpectLog(gen_log, "Unsatisfiable read"): - with ignore_deprecation(): - rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5) - yield closed.wait() - finally: - ws.close() - rs.close() - @gen_test def test_read_until_max_bytes_inline(self): rs, ws = yield self.make_iostream_pair() @@ -874,33 +669,6 @@ class TestIOStreamMixin(TestReadWriteMixin): listener.close() raise gen.Return((server_stream, client_stream)) - def test_connection_refused_legacy(self): - # When a connection is refused, the connect callback should not - # be run. (The kqueue IOLoop used to behave differently from the - # epoll IOLoop in this respect) - cleanup_func, port = refusing_port() - self.addCleanup(cleanup_func) - stream = IOStream(socket.socket()) - self.connect_called = False - - def connect_callback(): - self.connect_called = True - self.stop() - stream.set_close_callback(self.stop) - # log messages vary by platform and ioloop implementation - with ExpectLog(gen_log, ".*", required=False): - with ignore_deprecation(): - stream.connect(("127.0.0.1", port), connect_callback) - self.wait() - self.assertFalse(self.connect_called) - self.assertTrue(isinstance(stream.error, socket.error), stream.error) - if sys.platform != 'cygwin': - _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,) - if hasattr(errno, "WSAECONNREFUSED"): - _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,) - # cygwin's errnos don't match those used on native windows python - self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED) - @gen_test def test_connection_refused(self): # When a connection is refused, the connect callback should not @@ -941,27 +709,6 @@ class TestIOStreamMixin(TestReadWriteMixin): yield stream.connect(('localhost', 80)) self.assertTrue(isinstance(stream.error, socket.gaierror)) - @gen_test - def test_read_callback_error(self): - # Test that IOStream sets its exc_info when a read callback throws - server, client = yield self.make_iostream_pair() - try: - closed = Event() - server.set_close_callback(closed.set) - with ExpectLog( - app_log, "(Uncaught exception|Exception in callback)" - ): - # Clear ExceptionStackContext so IOStream catches error - with NullContext(): - with ignore_deprecation(): - server.read_bytes(1, callback=lambda data: 1 / 0) - client.write(b"1") - yield closed.wait() - self.assertTrue(isinstance(server.error, ZeroDivisionError)) - finally: - server.close() - client.close() - @unittest.skipIf(mock is None, 'mock package not present') @gen_test def test_read_until_close_with_error(self): @@ -1228,27 +975,6 @@ class WaitForHandshakeTest(AsyncTestCase): if client is not None: client.close() - @gen_test - def test_wait_for_handshake_callback(self): - test = self - handshake_future = Future() - - class TestServer(TCPServer): - def handle_stream(self, stream, address): - # The handshake has not yet completed. - test.assertIsNone(stream.socket.cipher()) - self.stream = stream - with ignore_deprecation(): - stream.wait_for_handshake(self.handshake_done) - - def handshake_done(self): - # Now the handshake is done and ssl information is available. - test.assertIsNotNone(self.stream.socket.cipher()) - handshake_future.set_result(None) - - yield self.connect_to_server(TestServer) - yield handshake_future - @gen_test def test_wait_for_handshake_future(self): test = self