]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
iostream: Remove deprecated interfaces
authorBen Darnell <ben@bendarnell.com>
Sat, 7 Jul 2018 04:13:36 +0000 (00:13 -0400)
committerBen Darnell <ben@bendarnell.com>
Sat, 14 Jul 2018 20:58:48 +0000 (16:58 -0400)
This is the riskiest change in this series since it does some internal
refactoring to get rid of some callback-related machinery.

tornado/iostream.py
tornado/test/concurrent_test.py
tornado/test/iostream_test.py

index 89e1e234e4dfb143639b89778b24ed66c5d5735a..7d627483e24dc538a5638fe3ff7ffdbef6f746e4 100644 (file)
@@ -31,9 +31,9 @@ import io
 import numbers
 import os
 import socket
+import ssl
 import sys
 import re
-import warnings
 
 from tornado.concurrent import Future
 from tornado import ioloop
@@ -47,12 +47,6 @@ try:
 except ImportError:
     _set_nonblocking = None
 
-try:
-    import ssl
-except ImportError:
-    # ssl is not available on Google App Engine
-    ssl = None
-
 # These errnos indicate that a non-blocking operation must be retried
 # at a later time.  On most platforms they're the same value, but on
 # some they differ.
@@ -211,20 +205,19 @@ class _StreamBuffer(object):
 class BaseIOStream(object):
     """A utility class to write to and read from a non-blocking file or socket.
 
-    We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
-    All of the methods take an optional ``callback`` argument and return a
-    `.Future` only if no callback is given.  When the operation completes,
-    the callback will be run or the `.Future` will resolve with the data
-    read (or ``None`` for ``write()``).  All outstanding ``Futures`` will
-    resolve with a `StreamClosedError` when the stream is closed; users
-    of the callback interface will be notified via
-    `.BaseIOStream.set_close_callback` instead.
+    We support a non-blocking ``write()`` and a family of ``read_*()``
+    methods. When the operation completes, the `.Future` will resolve
+    with the data read (or ``None`` for ``write()``). All outstanding
+    ``Futures`` will resolve with a `StreamClosedError` when the
+    stream is closed; `.BaseIOStream.set_close_callback` can also be used
+    to be notified of a closed stream.
 
     When a stream is closed due to an error, the IOStream's ``error``
     attribute contains the exception object.
 
     Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
     `read_from_fd`, and optionally `get_fd_error`.
+
     """
     def __init__(self, max_buffer_size=None,
                  read_chunk_size=None, max_write_buffer_size=None):
@@ -266,22 +259,17 @@ class BaseIOStream(object):
         self._read_bytes = None
         self._read_partial = False
         self._read_until_close = False
-        self._read_callback = None
         self._read_future = None
-        self._streaming_callback = None
-        self._write_callback = None
         self._write_futures = collections.deque()
         self._close_callback = None
-        self._connect_callback = None
         self._connect_future = None
         # _ssl_connect_future should be defined in SSLIOStream
-        # but it's here so we can clean it up in maybe_run_close_callback.
+        # but it's here so we can clean it up in _signal_closed
         # TODO: refactor that so subclasses can add additional futures
         # to be cancelled.
         self._ssl_connect_future = None
         self._connecting = False
         self._state = None
-        self._pending_callbacks = 0
         self._closed = False
 
     def fileno(self):
@@ -328,13 +316,11 @@ class BaseIOStream(object):
         """
         return None
 
-    def read_until_regex(self, regex, callback=None, max_bytes=None):
+    def read_until_regex(self, regex, max_bytes=None):
         """Asynchronously read until we have matched the given regex.
 
         The result includes the data that matches the regex and anything
-        that came before it.  If a callback is given, it will be run
-        with the data as an argument; if not, this method returns a
-        `.Future`.
+        that came before it.
 
         If ``max_bytes`` is not None, the connection will be closed
         if more than ``max_bytes`` bytes have been read and the regex is
@@ -344,13 +330,13 @@ class BaseIOStream(object):
             Added the ``max_bytes`` argument.  The ``callback`` argument is
             now optional and a `.Future` will be returned if it is omitted.
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` argument is deprecated and will be removed
-           in Tornado 6.0. Use the returned `.Future` instead.
+           The ``callback`` argument was removed. Use the returned
+           `.Future` instead.
 
         """
-        future = self._set_read_callback(callback)
+        future = self._start_read()
         self._read_regex = re.compile(regex)
         self._read_max_bytes = max_bytes
         try:
@@ -361,19 +347,16 @@ class BaseIOStream(object):
             self.close(exc_info=e)
             return future
         except:
-            if future is not None:
-                # Ensure that the future doesn't log an error because its
-                # failure was never examined.
-                future.add_done_callback(lambda f: f.exception())
+            # Ensure that the future doesn't log an error because its
+            # failure was never examined.
+            future.add_done_callback(lambda f: f.exception())
             raise
         return future
 
-    def read_until(self, delimiter, callback=None, max_bytes=None):
+    def read_until(self, delimiter, max_bytes=None):
         """Asynchronously read until we have found the given delimiter.
 
         The result includes all the data read including the delimiter.
-        If a callback is given, it will be run with the data as an argument;
-        if not, this method returns a `.Future`.
 
         If ``max_bytes`` is not None, the connection will be closed
         if more than ``max_bytes`` bytes have been read and the delimiter
@@ -383,12 +366,12 @@ class BaseIOStream(object):
             Added the ``max_bytes`` argument.  The ``callback`` argument is
             now optional and a `.Future` will be returned if it is omitted.
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` argument is deprecated and will be removed
-           in Tornado 6.0. Use the returned `.Future` instead.
+           The ``callback`` argument was removed. Use the returned
+           `.Future` instead.
         """
-        future = self._set_read_callback(callback)
+        future = self._start_read()
         self._read_delimiter = delimiter
         self._read_max_bytes = max_bytes
         try:
@@ -399,58 +382,42 @@ class BaseIOStream(object):
             self.close(exc_info=e)
             return future
         except:
-            if future is not None:
-                future.add_done_callback(lambda f: f.exception())
+            future.add_done_callback(lambda f: f.exception())
             raise
         return future
 
-    def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
-                   partial=False):
+    def read_bytes(self, num_bytes, partial=False):
         """Asynchronously read a number of bytes.
 
-        If a ``streaming_callback`` is given, it will be called with chunks
-        of data as they become available, and the final result will be empty.
-        Otherwise, the result is all the data that was read.
-        If a callback is given, it will be run with the data as an argument;
-        if not, this method returns a `.Future`.
-
-        If ``partial`` is true, the callback is run as soon as we have
+        If ``partial`` is true, data is returned as soon as we have
         any bytes to return (but never more than ``num_bytes``)
 
         .. versionchanged:: 4.0
             Added the ``partial`` argument.  The callback argument is now
             optional and a `.Future` will be returned if it is omitted.
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` and ``streaming_callback`` arguments are
-           deprecated and will be removed in Tornado 6.0. Use the
-           returned `.Future` (and ``partial=True`` for
-           ``streaming_callback``) instead.
+           The ``callback`` and ``streaming_callback`` arguments have
+           been removed. Use the returned `.Future` (and
+           ``partial=True`` for ``streaming_callback``) instead.
 
         """
-        future = self._set_read_callback(callback)
+        future = self._start_read()
         assert isinstance(num_bytes, numbers.Integral)
         self._read_bytes = num_bytes
         self._read_partial = partial
-        if streaming_callback is not None:
-            warnings.warn("streaming_callback is deprecated, use partial instead",
-                          DeprecationWarning)
-            self._streaming_callback = stack_context.wrap(streaming_callback)
         try:
             self._try_inline_read()
         except:
-            if future is not None:
-                future.add_done_callback(lambda f: f.exception())
+            future.add_done_callback(lambda f: f.exception())
             raise
         return future
 
-    def read_into(self, buf, callback=None, partial=False):
+    def read_into(self, buf, partial=False):
         """Asynchronously read a number of bytes.
 
         ``buf`` must be a writable buffer into which data will be read.
-        If a callback is given, it will be run with the number of read
-        bytes as an argument; if not, this method returns a `.Future`.
 
         If ``partial`` is true, the callback is run as soon as any bytes
         have been read.  Otherwise, it is run when the ``buf`` has been
@@ -458,13 +425,13 @@ class BaseIOStream(object):
 
         .. versionadded:: 5.0
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` argument is deprecated and will be removed
-           in Tornado 6.0. Use the returned `.Future` instead.
+           The ``callback`` argument was removed. Use the returned
+           `.Future` instead.
 
         """
-        future = self._set_read_callback(callback)
+        future = self._start_read()
 
         # First copy data already in read buffer
         available_bytes = self._read_buffer_size
@@ -490,68 +457,45 @@ class BaseIOStream(object):
         try:
             self._try_inline_read()
         except:
-            if future is not None:
-                future.add_done_callback(lambda f: f.exception())
+            future.add_done_callback(lambda f: f.exception())
             raise
         return future
 
-    def read_until_close(self, callback=None, streaming_callback=None):
+    def read_until_close(self):
         """Asynchronously reads all data from the socket until it is closed.
 
-        If a ``streaming_callback`` is given, it will be called with chunks
-        of data as they become available, and the final result will be empty.
-        Otherwise, the result is all the data that was read.
-        If a callback is given, it will be run with the data as an argument;
-        if not, this method returns a `.Future`.
-
-        Note that if a ``streaming_callback`` is used, data will be
-        read from the socket as quickly as it becomes available; there
-        is no way to apply backpressure or cancel the reads. If flow
-        control or cancellation are desired, use a loop with
-        `read_bytes(partial=True) <.read_bytes>` instead.
+        This will buffer all available data until ``max_buffer_size``
+        is reached. If flow control or cancellation are desired, use a
+        loop with `read_bytes(partial=True) <.read_bytes>` instead.
 
         .. versionchanged:: 4.0
             The callback argument is now optional and a `.Future` will
             be returned if it is omitted.
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` and ``streaming_callback`` arguments are
-           deprecated and will be removed in Tornado 6.0. Use the
-           returned `.Future` (and `read_bytes` with ``partial=True``
-           for ``streaming_callback``) instead.
+           The ``callback`` and ``streaming_callback`` arguments have
+           been removed. Use the returned `.Future` (and `read_bytes`
+           with ``partial=True`` for ``streaming_callback``) instead.
 
         """
-        future = self._set_read_callback(callback)
-        if streaming_callback is not None:
-            warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead",
-                          DeprecationWarning)
-            self._streaming_callback = stack_context.wrap(streaming_callback)
+        future = self._start_read()
         if self.closed():
-            if self._streaming_callback is not None:
-                self._run_read_callback(self._read_buffer_size, True)
-            self._run_read_callback(self._read_buffer_size, False)
+            self._finish_read(self._read_buffer_size, False)
             return future
         self._read_until_close = True
         try:
             self._try_inline_read()
         except:
-            if future is not None:
-                future.add_done_callback(lambda f: f.exception())
+            future.add_done_callback(lambda f: f.exception())
             raise
         return future
 
-    def write(self, data, callback=None):
+    def write(self, data):
         """Asynchronously write the given data to this stream.
 
-        If ``callback`` is given, we call it when all of the buffered write
-        data has been successfully written to the stream. If there was
-        previously buffered write data and an old write callback, that
-        callback is simply overwritten with this new callback.
-
-        If no ``callback`` is given, this method returns a `.Future` that
-        resolves (with a result of ``None``) when the write has been
-        completed.
+        This method returns a `.Future` that resolves (with a result
+        of ``None``) when the write has been completed.
 
         The ``data`` argument may be of type `bytes` or `memoryview`.
 
@@ -561,10 +505,10 @@ class BaseIOStream(object):
         .. versionchanged:: 4.5
             Added support for `memoryview` arguments.
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` argument is deprecated and will be removed
-           in Tornado 6.0. Use the returned `.Future` instead.
+           The ``callback`` argument was removed. Use the returned
+           `.Future` instead.
 
         """
         self._check_closed()
@@ -574,15 +518,9 @@ class BaseIOStream(object):
                 raise StreamBufferFullError("Reached maximum write buffer size")
             self._write_buffer.append(data)
             self._total_write_index += len(data)
-        if callback is not None:
-            warnings.warn("callback argument is deprecated, use returned Future instead",
-                          DeprecationWarning)
-            self._write_callback = stack_context.wrap(callback)
-            future = None
-        else:
-            future = Future()
-            future.add_done_callback(lambda f: f.exception())
-            self._write_futures.append((self._total_write_index, future))
+        future = Future()
+        future.add_done_callback(lambda f: f.exception())
+        self._write_futures.append((self._total_write_index, future))
         if not self._connecting:
             self._handle_write()
             if self._write_buffer:
@@ -600,7 +538,7 @@ class BaseIOStream(object):
         closed while no other read or write is in progress.
 
         Unlike other callback-based interfaces, ``set_close_callback``
-        will not be removed in Tornado 6.0.
+        was not removed in Tornado 6.0.
         """
         self._close_callback = stack_context.wrap(callback)
         self._maybe_add_error_listener()
@@ -623,51 +561,46 @@ class BaseIOStream(object):
                     if any(exc_info):
                         self.error = exc_info[1]
             if self._read_until_close:
-                if (self._streaming_callback is not None and
-                        self._read_buffer_size):
-                    self._run_read_callback(self._read_buffer_size, True)
                 self._read_until_close = False
-                self._run_read_callback(self._read_buffer_size, False)
+                self._finish_read(self._read_buffer_size, False)
             if self._state is not None:
                 self.io_loop.remove_handler(self.fileno())
                 self._state = None
             self.close_fd()
             self._closed = True
-        self._maybe_run_close_callback()
-
-    def _maybe_run_close_callback(self):
-        # If there are pending callbacks, don't run the close callback
-        # until they're done (see _maybe_add_error_handler)
-        if self.closed() and self._pending_callbacks == 0:
-            futures = []
-            if self._read_future is not None:
-                futures.append(self._read_future)
-                self._read_future = None
-            futures += [future for _, future in self._write_futures]
-            self._write_futures.clear()
-            if self._connect_future is not None:
-                futures.append(self._connect_future)
-                self._connect_future = None
-            if self._ssl_connect_future is not None:
-                futures.append(self._ssl_connect_future)
-                self._ssl_connect_future = None
-            for future in futures:
-                future.set_exception(StreamClosedError(real_error=self.error))
-                future.exception()
-            if self._close_callback is not None:
-                cb = self._close_callback
-                self._close_callback = None
-                self._run_callback(cb)
-            # Delete any unfinished callbacks to break up reference cycles.
-            self._read_callback = self._write_callback = None
-            # Clear the buffers so they can be cleared immediately even
-            # if the IOStream object is kept alive by a reference cycle.
-            # TODO: Clear the read buffer too; it currently breaks some tests.
-            self._write_buffer = None
+        self._signal_closed()
+
+    def _signal_closed(self):
+        futures = []
+        if self._read_future is not None:
+            futures.append(self._read_future)
+            self._read_future = None
+        futures += [future for _, future in self._write_futures]
+        self._write_futures.clear()
+        if self._connect_future is not None:
+            futures.append(self._connect_future)
+            self._connect_future = None
+        for future in futures:
+            future.set_exception(StreamClosedError(real_error=self.error))
+            future.exception()
+        if self._ssl_connect_future is not None:
+            # _ssl_connect_future expects to see the real exception (typically
+            # an ssl.SSLError), not just StreamClosedError.
+            self._ssl_connect_future.set_exception(self.error)
+            self._ssl_connect_future.exception()
+            self._ssl_connect_future = None
+        if self._close_callback is not None:
+            cb = self._close_callback
+            self._close_callback = None
+            self.io_loop.add_callback(cb)
+        # Clear the buffers so they can be cleared immediately even
+        # if the IOStream object is kept alive by a reference cycle.
+        # TODO: Clear the read buffer too; it currently breaks some tests.
+        self._write_buffer = None
 
     def reading(self):
         """Returns true if we are currently reading from the stream."""
-        return self._read_callback is not None or self._read_future is not None
+        return self._read_future is not None
 
     def writing(self):
         """Returns true if we are currently writing to the stream."""
@@ -745,100 +678,47 @@ class BaseIOStream(object):
             self.close(exc_info=e)
             raise
 
-    def _run_callback(self, callback, *args):
-        def wrapper():
-            self._pending_callbacks -= 1
-            try:
-                return callback(*args)
-            except Exception as e:
-                app_log.error("Uncaught exception, closing connection.",
-                              exc_info=True)
-                # Close the socket on an uncaught exception from a user callback
-                # (It would eventually get closed when the socket object is
-                # gc'd, but we don't want to rely on gc happening before we
-                # run out of file descriptors)
-                self.close(exc_info=e)
-                # Re-raise the exception so that IOLoop.handle_callback_exception
-                # can see it and log the error
-                raise
-            finally:
-                self._maybe_add_error_listener()
-        # We schedule callbacks to be run on the next IOLoop iteration
-        # rather than running them directly for several reasons:
-        # * Prevents unbounded stack growth when a callback calls an
-        #   IOLoop operation that immediately runs another callback
-        # * Provides a predictable execution context for e.g.
-        #   non-reentrant mutexes
-        # * Ensures that the try/except in wrapper() is run outside
-        #   of the application's StackContexts
-        with stack_context.NullContext():
-            # stack_context was already captured in callback, we don't need to
-            # capture it again for IOStream's wrapper.  This is especially
-            # important if the callback was pre-wrapped before entry to
-            # IOStream (as in HTTPConnection._header_callback), as we could
-            # capture and leak the wrong context here.
-            self._pending_callbacks += 1
-            self.io_loop.add_callback(wrapper)
-
     def _read_to_buffer_loop(self):
         # This method is called from _handle_read and _try_inline_read.
-        try:
-            if self._read_bytes is not None:
-                target_bytes = self._read_bytes
-            elif self._read_max_bytes is not None:
-                target_bytes = self._read_max_bytes
-            elif self.reading():
-                # For read_until without max_bytes, or
-                # read_until_close, read as much as we can before
-                # scanning for the delimiter.
-                target_bytes = None
-            else:
-                target_bytes = 0
-            next_find_pos = 0
-            # Pretend to have a pending callback so that an EOF in
-            # _read_to_buffer doesn't trigger an immediate close
-            # callback.  At the end of this method we'll either
-            # establish a real pending callback via
-            # _read_from_buffer or run the close callback.
-            #
-            # We need two try statements here so that
-            # pending_callbacks is decremented before the `except`
-            # clause below (which calls `close` and does need to
-            # trigger the callback)
-            self._pending_callbacks += 1
-            while not self.closed():
-                # Read from the socket until we get EWOULDBLOCK or equivalent.
-                # SSL sockets do some internal buffering, and if the data is
-                # sitting in the SSL object's buffer select() and friends
-                # can't see it; the only way to find out if it's there is to
-                # try to read it.
-                if self._read_to_buffer() == 0:
-                    break
+        if self._read_bytes is not None:
+            target_bytes = self._read_bytes
+        elif self._read_max_bytes is not None:
+            target_bytes = self._read_max_bytes
+        elif self.reading():
+            # For read_until without max_bytes, or
+            # read_until_close, read as much as we can before
+            # scanning for the delimiter.
+            target_bytes = None
+        else:
+            target_bytes = 0
+        next_find_pos = 0
+        while not self.closed():
+            # Read from the socket until we get EWOULDBLOCK or equivalent.
+            # SSL sockets do some internal buffering, and if the data is
+            # sitting in the SSL object's buffer select() and friends
+            # can't see it; the only way to find out if it's there is to
+            # try to read it.
+            if self._read_to_buffer() == 0:
+                break
 
-                self._run_streaming_callback()
+            # If we've read all the bytes we can use, break out of
+            # this loop.
 
-                # If we've read all the bytes we can use, break out of
-                # this loop.  We can't just call read_from_buffer here
-                # because of subtle interactions with the
-                # pending_callback and error_listener mechanisms.
-                #
-                # If we've reached target_bytes, we know we're done.
-                if (target_bytes is not None and
-                        self._read_buffer_size >= target_bytes):
-                    break
+            # If we've reached target_bytes, we know we're done.
+            if (target_bytes is not None and
+                    self._read_buffer_size >= target_bytes):
+                break
 
-                # Otherwise, we need to call the more expensive find_read_pos.
-                # It's inefficient to do this on every read, so instead
-                # do it on the first read and whenever the read buffer
-                # size has doubled.
-                if self._read_buffer_size >= next_find_pos:
-                    pos = self._find_read_pos()
-                    if pos is not None:
-                        return pos
-                    next_find_pos = self._read_buffer_size * 2
-            return self._find_read_pos()
-        finally:
-            self._pending_callbacks -= 1
+            # Otherwise, we need to call the more expensive find_read_pos.
+            # It's inefficient to do this on every read, so instead
+            # do it on the first read and whenever the read buffer
+            # size has doubled.
+            if self._read_buffer_size >= next_find_pos:
+                pos = self._find_read_pos()
+                if pos is not None:
+                    return pos
+                next_find_pos = self._read_buffer_size * 2
+        return self._find_read_pos()
 
     def _handle_read(self):
         try:
@@ -851,22 +731,13 @@ class BaseIOStream(object):
             return
         if pos is not None:
             self._read_from_buffer(pos)
-            return
-        else:
-            self._maybe_run_close_callback()
 
-    def _set_read_callback(self, callback):
-        assert self._read_callback is None, "Already reading"
+    def _start_read(self):
         assert self._read_future is None, "Already reading"
-        if callback is not None:
-            warnings.warn("callbacks are deprecated, use returned Future instead",
-                          DeprecationWarning)
-            self._read_callback = stack_context.wrap(callback)
-        else:
-            self._read_future = Future()
+        self._read_future = Future()
         return self._read_future
 
-    def _run_read_callback(self, size, streaming):
+    def _finish_read(self, size, streaming):
         if self._user_read_buffer:
             self._read_buffer = self._after_user_read_buffer or bytearray()
             self._after_user_read_buffer = None
@@ -876,24 +747,11 @@ class BaseIOStream(object):
             result = size
         else:
             result = self._consume(size)
-        if streaming:
-            callback = self._streaming_callback
-        else:
-            callback = self._read_callback
-            self._read_callback = self._streaming_callback = None
-            if self._read_future is not None:
-                assert callback is None
-                future = self._read_future
-                self._read_future = None
-
-                future.set_result(result)
-        if callback is not None:
-            assert (self._read_future is None) or streaming
-            self._run_callback(callback, result)
-        else:
-            # If we scheduled a callback, we will add the error listener
-            # afterwards.  If we didn't, we have to do it now.
-            self._maybe_add_error_listener()
+        if self._read_future is not None:
+            future = self._read_future
+            self._read_future = None
+            future.set_result(result)
+        self._maybe_add_error_listener()
 
     def _try_inline_read(self):
         """Attempt to complete the current read operation from buffered data.
@@ -903,29 +761,18 @@ class BaseIOStream(object):
         listening for reads on the socket.
         """
         # See if we've already got the data from a previous read
-        self._run_streaming_callback()
         pos = self._find_read_pos()
         if pos is not None:
             self._read_from_buffer(pos)
             return
         self._check_closed()
-        try:
-            pos = self._read_to_buffer_loop()
-        except Exception:
-            # If there was an in _read_to_buffer, we called close() already,
-            # but couldn't run the close callback because of _pending_callbacks.
-            # Before we escape from this function, run the close callback if
-            # applicable.
-            self._maybe_run_close_callback()
-            raise
+        pos = self._read_to_buffer_loop()
         if pos is not None:
             self._read_from_buffer(pos)
             return
-        # We couldn't satisfy the read inline, so either close the stream
-        # or listen for new data.
-        if self.closed():
-            self._maybe_run_close_callback()
-        else:
+        # We couldn't satisfy the read inline, so make sure we're
+        # listening for new data unless the stream is closed.
+        if not self.closed():
             self._add_io_state(ioloop.IOLoop.READ)
 
     def _read_to_buffer(self):
@@ -974,14 +821,6 @@ class BaseIOStream(object):
             raise StreamBufferFullError("Reached maximum read buffer size")
         return bytes_read
 
-    def _run_streaming_callback(self):
-        if self._streaming_callback is not None and self._read_buffer_size:
-            bytes_to_consume = self._read_buffer_size
-            if self._read_bytes is not None:
-                bytes_to_consume = min(self._read_bytes, bytes_to_consume)
-                self._read_bytes -= bytes_to_consume
-            self._run_read_callback(bytes_to_consume, True)
-
     def _read_from_buffer(self, pos):
         """Attempts to complete the currently-pending read from the buffer.
 
@@ -990,7 +829,7 @@ class BaseIOStream(object):
         """
         self._read_bytes = self._read_delimiter = self._read_regex = None
         self._read_partial = False
-        self._run_read_callback(pos, False)
+        self._finish_read(pos, False)
 
     def _find_read_pos(self):
         """Attempts to find a position in the read buffer that satisfies
@@ -1082,12 +921,6 @@ class BaseIOStream(object):
             self._write_futures.popleft()
             future.set_result(None)
 
-        if not len(self._write_buffer):
-            if self._write_callback:
-                callback = self._write_callback
-                self._write_callback = None
-                self._run_callback(callback)
-
     def _consume(self, loc):
         # Consume loc bytes from the read buffer and return them
         if loc == 0:
@@ -1118,13 +951,10 @@ class BaseIOStream(object):
         # connection is first established because we are going to read or write
         # immediately anyway.  Instead, we insert checks at various times to
         # see if the connection is idle and add the read listener then.
-        if self._pending_callbacks != 0:
-            return
         if self._state is None or self._state == ioloop.IOLoop.ERROR:
-            if self.closed():
-                self._maybe_run_close_callback()
-            elif (self._read_buffer_size == 0 and
-                  self._close_callback is not None):
+            if (not self.closed() and
+                    self._read_buffer_size == 0 and
+                    self._close_callback is not None):
                 self._add_io_state(ioloop.IOLoop.READ)
 
     def _add_io_state(self, state):
@@ -1133,20 +963,18 @@ class BaseIOStream(object):
         Implementation notes: Reads and writes have a fast path and a
         slow path.  The fast path reads synchronously from socket
         buffers, while the slow path uses `_add_io_state` to schedule
-        an IOLoop callback.  Note that in both cases, the callback is
-        run asynchronously with `_run_callback`.
+        an IOLoop callback.
 
         To detect closed connections, we must have called
         `_add_io_state` at some point, but we want to delay this as
         much as possible so we don't have to set an `IOLoop.ERROR`
         listener that will be overwritten by the next slow-path
-        operation.  As long as there are callbacks scheduled for
-        fast-path ops, those callbacks may do more reads.
-        If a sequence of fast-path ops do not end in a slow-path op,
-        (e.g. for an @asynchronous long-poll request), we must add
-        the error handler.  This is done in `_run_callback` and `write`
-        (since the write callback is optional so we can have a
-        fast-path write with no `_run_callback`)
+        operation. If a sequence of fast-path ops do not end in a
+        slow-path op, (e.g. for an @asynchronous long-poll request),
+        we must add the error handler.
+
+        TODO: reevaluate this now that callbacks are gone.
+
         """
         if self.closed():
             # connection has been closed, so there can be no future events
@@ -1252,7 +1080,7 @@ class IOStream(BaseIOStream):
             # See https://github.com/tornadoweb/tornado/pull/2008
             del data
 
-    def connect(self, address, callback=None, server_hostname=None):
+    def connect(self, address, server_hostname=None):
         """Connects the socket to a remote address without blocking.
 
         May only be called if the socket passed to the constructor was
@@ -1291,20 +1119,14 @@ class IOStream(BaseIOStream):
            suitably-configured `ssl.SSLContext` to the
            `SSLIOStream` constructor to disable.
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` argument is deprecated and will be removed
-           in Tornado 6.0. Use the returned `.Future` instead.
+           The ``callback`` argument was removed. Use the returned
+           `.Future` instead.
 
         """
         self._connecting = True
-        if callback is not None:
-            warnings.warn("callback argument is deprecated, use returned Future instead",
-                          DeprecationWarning)
-            self._connect_callback = stack_context.wrap(callback)
-            future = None
-        else:
-            future = self._connect_future = Future()
+        future = self._connect_future = Future()
         try:
             self.socket.connect(address)
         except socket.error as e:
@@ -1360,10 +1182,10 @@ class IOStream(BaseIOStream):
            ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
            suitably-configured `ssl.SSLContext` to disable.
         """
-        if (self._read_callback or self._read_future or
-                self._write_callback or self._write_futures or
-                self._connect_callback or self._connect_future or
-                self._pending_callbacks or self._closed or
+        if (self._read_future or
+                self._write_futures or
+                self._connect_future or
+                self._closed or
                 self._read_buffer or self._write_buffer):
             raise ValueError("IOStream is not idle; cannot convert to SSL")
         if ssl_options is None:
@@ -1384,27 +1206,8 @@ class IOStream(BaseIOStream):
 
         future = Future()
         ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
-        # Wrap the original close callback so we can fail our Future as well.
-        # If we had an "unwrap" counterpart to this method we would need
-        # to restore the original callback after our Future resolves
-        # so that repeated wrap/unwrap calls don't build up layers.
-
-        def close_callback():
-            if not future.done():
-                # Note that unlike most Futures returned by IOStream,
-                # this one passes the underlying error through directly
-                # instead of wrapping everything in a StreamClosedError
-                # with a real_error attribute. This is because once the
-                # connection is established it's more helpful to raise
-                # the SSLError directly than to hide it behind a
-                # StreamClosedError (and the client is expecting SSL
-                # issues rather than network issues since this method is
-                # named start_tls).
-                future.set_exception(ssl_stream.error or StreamClosedError())
-            if orig_close_callback is not None:
-                orig_close_callback()
-        ssl_stream.set_close_callback(close_callback)
-        ssl_stream._ssl_connect_callback = lambda: future.set_result(ssl_stream)
+        ssl_stream.set_close_callback(orig_close_callback)
+        ssl_stream._ssl_connect_future = future
         ssl_stream.max_buffer_size = self.max_buffer_size
         ssl_stream.read_chunk_size = self.read_chunk_size
         return future
@@ -1428,10 +1231,6 @@ class IOStream(BaseIOStream):
                                 self.socket.fileno(), errno.errorcode[err])
             self.close()
             return
-        if self._connect_callback is not None:
-            callback = self._connect_callback
-            self._connect_callback = None
-            self._run_callback(callback)
         if self._connect_future is not None:
             future = self._connect_future
             self._connect_future = None
@@ -1473,7 +1272,6 @@ class SSLIOStream(IOStream):
         self._ssl_accepting = True
         self._handshake_reading = False
         self._handshake_writing = False
-        self._ssl_connect_callback = None
         self._server_hostname = None
 
         # If the socket is already connected, attempt to start the handshake.
@@ -1537,13 +1335,9 @@ class SSLIOStream(IOStream):
             if not self._verify_cert(self.socket.getpeercert()):
                 self.close()
                 return
-            self._run_ssl_connect_callback()
+            self._finish_ssl_connect()
 
-    def _run_ssl_connect_callback(self):
-        if self._ssl_connect_callback is not None:
-            callback = self._ssl_connect_callback
-            self._ssl_connect_callback = None
-            self._run_callback(callback)
+    def _finish_ssl_connect(self):
         if self._ssl_connect_future is not None:
             future = self._ssl_connect_future
             self._ssl_connect_future = None
@@ -1588,16 +1382,22 @@ class SSLIOStream(IOStream):
             return
         super(SSLIOStream, self)._handle_write()
 
-    def connect(self, address, callback=None, server_hostname=None):
+    def connect(self, address, server_hostname=None):
         self._server_hostname = server_hostname
         # Ignore the result of connect(). If it fails,
         # wait_for_handshake will raise an error too. This is
         # necessary for the old semantics of the connect callback
         # (which takes no arguments). In 6.0 this can be refactored to
         # be a regular coroutine.
+        # TODO: This is trickier than it looks, since if write()
+        # is called with a connect() pending, we want the connect
+        # to resolve before the write. Or do we care about this?
+        # (There's a test for it, but I think in practice users
+        # either wait for the connect before performing a write or
+        # they don't care about the connect Future at all)
         fut = super(SSLIOStream, self).connect(address)
         fut.add_done_callback(lambda f: f.exception())
-        return self.wait_for_handshake(callback)
+        return self.wait_for_handshake()
 
     def _handle_connect(self):
         # Call the superclass method to check for errors.
@@ -1622,7 +1422,7 @@ class SSLIOStream(IOStream):
                                       do_handshake_on_connect=False)
         self._add_io_state(old_state)
 
-    def wait_for_handshake(self, callback=None):
+    def wait_for_handshake(self):
         """Wait for the initial SSL handshake to complete.
 
         If a ``callback`` is given, it will be called with no
@@ -1641,24 +1441,17 @@ class SSLIOStream(IOStream):
 
         .. versionadded:: 4.2
 
-        .. deprecated:: 5.1
+        .. versionchanged:: 6.0
 
-           The ``callback`` argument is deprecated and will be removed
-           in Tornado 6.0. Use the returned `.Future` instead.
+           The ``callback`` argument was removed. Use the returned
+           `.Future` instead.
 
         """
-        if (self._ssl_connect_callback is not None or
-                self._ssl_connect_future is not None):
+        if self._ssl_connect_future is not None:
             raise RuntimeError("Already waiting")
-        if callback is not None:
-            warnings.warn("callback argument is deprecated, use returned Future instead",
-                          DeprecationWarning)
-            self._ssl_connect_callback = stack_context.wrap(callback)
-            future = None
-        else:
-            future = self._ssl_connect_future = Future()
+        future = self._ssl_connect_future = Future()
         if not self._ssl_accepting:
-            self._run_ssl_connect_callback()
+            self._finish_ssl_connect()
         return future
 
     def write_to_fd(self, data):
index 6bdecf5449b57ac5d0b04602d4b8624977e3a222..99402d167ea9695642a4911196141efe591df6ac 100644 (file)
@@ -85,33 +85,6 @@ class BaseCapClient(object):
             raise CapError(message)
 
 
-class ManualCapClient(BaseCapClient):
-    def capitalize(self, request_data, callback=None):
-        logging.debug("capitalize")
-        self.request_data = request_data
-        self.stream = IOStream(socket.socket())
-        self.stream.connect(('127.0.0.1', self.port),
-                            callback=self.handle_connect)
-        self.future = Future()
-        if callback is not None:
-            self.future.add_done_callback(
-                stack_context.wrap(lambda future: callback(future.result())))
-        return self.future
-
-    def handle_connect(self):
-        logging.debug("handle_connect")
-        self.stream.write(utf8(self.request_data + "\n"))
-        self.stream.read_until(b'\n', callback=self.handle_read)
-
-    def handle_read(self, data):
-        logging.debug("handle_read")
-        self.stream.close()
-        try:
-            self.future.set_result(self.process_response(data))
-        except CapError as e:
-            self.future.set_exception(e)
-
-
 class GeneratorCapClient(BaseCapClient):
     @gen.coroutine
     def capitalize(self, request_data):
@@ -166,20 +139,6 @@ class ClientTestMixin(object):
         self.io_loop.run_sync(f)
 
 
-class ManualClientTest(ClientTestMixin, AsyncTestCase):
-    client_class = ManualCapClient
-
-    def setUp(self):
-        self.warning_catcher = warnings.catch_warnings()
-        self.warning_catcher.__enter__()
-        warnings.simplefilter('ignore', DeprecationWarning)
-        super(ManualClientTest, self).setUp()
-
-    def tearDown(self):
-        super(ManualClientTest, self).tearDown()
-        self.warning_catcher.__exit__(None, None, None)
-
-
 class GeneratorClientTest(ClientTestMixin, AsyncTestCase):
     client_class = GeneratorCapClient
 
index 51fbb02a99c7a0f3116e9ea19e3b7f68e323ec85..c039f41835a201472f6c3d3dedc69d928e92a148 100644 (file)
@@ -5,13 +5,11 @@ from tornado import netutil
 from tornado.iostream import IOStream, SSLIOStream, PipeIOStream, StreamClosedError, _StreamBuffer
 from tornado.httputil import HTTPHeaders
 from tornado.locks import Condition, Event
-from tornado.log import gen_log, app_log
+from tornado.log import gen_log
 from tornado.netutil import ssl_wrap_socket
-from tornado.stack_context import NullContext
 from tornado.tcpserver import TCPServer
 from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, bind_unused_port, ExpectLog, gen_test  # noqa: E501
-from tornado.test.util import (unittest, skipIfNonUnix, refusing_port, skipPypy3V58,
-                               ignore_deprecation)
+from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58
 from tornado.web import RequestHandler, Application
 import errno
 import hashlib
@@ -123,7 +121,7 @@ class TestIOStreamWebMixin(object):
         first_line = yield stream.read_until(b"\r\n")
         self.assertEqual(first_line, b"HTTP/1.1 200 OK\r\n")
         # callback=None is equivalent to no callback.
-        header_data = yield stream.read_until(b"\r\n\r\n", callback=None)
+        header_data = yield stream.read_until(b"\r\n\r\n")
         headers = HTTPHeaders.parse(header_data.decode('latin1'))
         content_length = int(headers['Content-Length'])
         body = yield stream.read_bytes(content_length)
@@ -171,167 +169,6 @@ class TestReadWriteMixin(object):
         ws.close()
         rs.close()
 
-    @gen_test
-    def test_streaming_callback(self):
-        rs, ws = yield self.make_iostream_pair()
-        try:
-            chunks = []
-            cond = Condition()
-
-            def streaming_callback(data):
-                chunks.append(data)
-                cond.notify()
-
-            with ignore_deprecation():
-                fut = rs.read_bytes(6, streaming_callback=streaming_callback)
-            ws.write(b"1234")
-            while not chunks:
-                yield cond.wait()
-            ws.write(b"5678")
-            final_data = yield(fut)
-            self.assertFalse(final_data)
-            self.assertEqual(chunks, [b"1234", b"56"])
-
-            # the rest of the last chunk is still in the buffer
-            data = yield rs.read_bytes(2)
-            self.assertEqual(data, b"78")
-        finally:
-            rs.close()
-            ws.close()
-
-    @gen_test
-    def test_streaming_callback_with_final_callback(self):
-        rs, ws = yield self.make_iostream_pair()
-        try:
-            chunks = []
-            final_called = []
-            cond = Condition()
-
-            def streaming_callback(data):
-                chunks.append(data)
-                cond.notify()
-
-            def final_callback(data):
-                self.assertFalse(data)
-                final_called.append(True)
-                cond.notify()
-            with ignore_deprecation():
-                rs.read_bytes(6, callback=final_callback,
-                              streaming_callback=streaming_callback)
-            ws.write(b"1234")
-            while not chunks:
-                yield cond.wait()
-            ws.write(b"5678")
-            while not final_called:
-                yield cond.wait()
-            self.assertEqual(chunks, [b"1234", b"56"])
-
-            # the rest of the last chunk is still in the buffer
-            data = yield rs.read_bytes(2)
-            self.assertEqual(data, b"78")
-        finally:
-            rs.close()
-            ws.close()
-
-    @gen_test
-    def test_streaming_callback_with_data_in_buffer(self):
-        rs, ws = yield self.make_iostream_pair()
-        ws.write(b"abcd\r\nefgh")
-        data = yield rs.read_until(b"\r\n")
-        self.assertEqual(data, b"abcd\r\n")
-
-        streaming_fut = Future()
-        with ignore_deprecation():
-            rs.read_until_close(streaming_callback=streaming_fut.set_result)
-        data = yield streaming_fut
-        self.assertEqual(data, b"efgh")
-        rs.close()
-        ws.close()
-
-    @gen_test
-    def test_streaming_until_close(self):
-        rs, ws = yield self.make_iostream_pair()
-        try:
-            chunks = []
-            closed = [False]
-            cond = Condition()
-
-            def streaming_callback(data):
-                chunks.append(data)
-                cond.notify()
-
-            def close_callback(data):
-                assert not data, data
-                closed[0] = True
-                cond.notify()
-            with ignore_deprecation():
-                rs.read_until_close(callback=close_callback,
-                                    streaming_callback=streaming_callback)
-            ws.write(b"1234")
-            while len(chunks) != 1:
-                yield cond.wait()
-            yield ws.write(b"5678")
-            ws.close()
-            while not closed[0]:
-                yield cond.wait()
-            self.assertEqual(chunks, [b"1234", b"5678"])
-        finally:
-            ws.close()
-            rs.close()
-
-    @gen_test
-    def test_streaming_until_close_future(self):
-        rs, ws = yield self.make_iostream_pair()
-        try:
-            chunks = []
-
-            @gen.coroutine
-            def rs_task():
-                with ignore_deprecation():
-                    yield rs.read_until_close(streaming_callback=chunks.append)
-
-            @gen.coroutine
-            def ws_task():
-                yield ws.write(b"1234")
-                yield gen.sleep(0.01)
-                yield ws.write(b"5678")
-                ws.close()
-
-            yield [rs_task(), ws_task()]
-            self.assertEqual(chunks, [b"1234", b"5678"])
-        finally:
-            ws.close()
-            rs.close()
-
-    @gen_test
-    def test_delayed_close_callback(self):
-        # The scenario:  Server closes the connection while there is a pending
-        # read that can be served out of buffered data.  The client does not
-        # run the close_callback as soon as it detects the close, but rather
-        # defers it until after the buffered read has finished.
-        rs, ws = yield self.make_iostream_pair()
-        try:
-            event = Event()
-            rs.set_close_callback(event.set)
-            ws.write(b"12")
-            chunks = []
-
-            def callback1(data):
-                chunks.append(data)
-                with ignore_deprecation():
-                    rs.read_bytes(1, callback2)
-                ws.close()
-
-            def callback2(data):
-                chunks.append(data)
-            with ignore_deprecation():
-                rs.read_bytes(1, callback1)
-            yield event.wait()  # stopped by close_callback
-            self.assertEqual(chunks, [b"1", b"2"])
-        finally:
-            ws.close()
-            rs.close()
-
     @gen_test
     def test_future_delayed_close_callback(self):
         # Same as test_delayed_close_callback, but with the future interface.
@@ -393,29 +230,6 @@ class TestReadWriteMixin(object):
             ws.close()
             rs.close()
 
-    @gen_test
-    def test_streaming_read_until_close_after_close(self):
-        # Same as the preceding test but with a streaming_callback.
-        # All data should go through the streaming callback,
-        # and the final read callback just gets an empty string.
-        rs, ws = yield self.make_iostream_pair()
-        try:
-            ws.write(b"1234")
-            ws.close()
-            data = yield rs.read_bytes(1)
-            self.assertEqual(data, b"1")
-            streaming_data = []
-            final_future = Future()
-            with ignore_deprecation():
-                rs.read_until_close(final_future.set_result,
-                                    streaming_callback=streaming_data.append)
-            final_data = yield final_future
-            self.assertEqual(b'', final_data)
-            self.assertEqual(b''.join(streaming_data), b"234")
-        finally:
-            ws.close()
-            rs.close()
-
     @gen_test
     def test_large_read_until(self):
         # Performance test: read_until used to have a quadratic component
@@ -553,25 +367,6 @@ class TestReadWriteMixin(object):
             ws.close()
             rs.close()
 
-    @gen_test
-    def test_read_until_max_bytes_inline_legacy(self):
-        rs, ws = yield self.make_iostream_pair()
-        closed = Event()
-        rs.set_close_callback(closed.set)
-        try:
-            # Similar to the error case in the previous test, but the
-            # ws writes first so rs reads are satisfied
-            # inline.  For consistency with the out-of-line case, we
-            # do not raise the error synchronously.
-            ws.write(b"123456")
-            with ExpectLog(gen_log, "Unsatisfiable read"):
-                with ignore_deprecation():
-                    rs.read_until(b"def", callback=lambda x: self.fail(), max_bytes=5)
-                yield closed.wait()
-        finally:
-            ws.close()
-            rs.close()
-
     @gen_test
     def test_read_until_max_bytes_inline(self):
         rs, ws = yield self.make_iostream_pair()
@@ -874,33 +669,6 @@ class TestIOStreamMixin(TestReadWriteMixin):
         listener.close()
         raise gen.Return((server_stream, client_stream))
 
-    def test_connection_refused_legacy(self):
-        # When a connection is refused, the connect callback should not
-        # be run.  (The kqueue IOLoop used to behave differently from the
-        # epoll IOLoop in this respect)
-        cleanup_func, port = refusing_port()
-        self.addCleanup(cleanup_func)
-        stream = IOStream(socket.socket())
-        self.connect_called = False
-
-        def connect_callback():
-            self.connect_called = True
-            self.stop()
-        stream.set_close_callback(self.stop)
-        # log messages vary by platform and ioloop implementation
-        with ExpectLog(gen_log, ".*", required=False):
-            with ignore_deprecation():
-                stream.connect(("127.0.0.1", port), connect_callback)
-            self.wait()
-        self.assertFalse(self.connect_called)
-        self.assertTrue(isinstance(stream.error, socket.error), stream.error)
-        if sys.platform != 'cygwin':
-            _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
-            if hasattr(errno, "WSAECONNREFUSED"):
-                _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
-            # cygwin's errnos don't match those used on native windows python
-            self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
-
     @gen_test
     def test_connection_refused(self):
         # When a connection is refused, the connect callback should not
@@ -941,27 +709,6 @@ class TestIOStreamMixin(TestReadWriteMixin):
                 yield stream.connect(('localhost', 80))
             self.assertTrue(isinstance(stream.error, socket.gaierror))
 
-    @gen_test
-    def test_read_callback_error(self):
-        # Test that IOStream sets its exc_info when a read callback throws
-        server, client = yield self.make_iostream_pair()
-        try:
-            closed = Event()
-            server.set_close_callback(closed.set)
-            with ExpectLog(
-                app_log, "(Uncaught exception|Exception in callback)"
-            ):
-                # Clear ExceptionStackContext so IOStream catches error
-                with NullContext():
-                    with ignore_deprecation():
-                        server.read_bytes(1, callback=lambda data: 1 / 0)
-                client.write(b"1")
-                yield closed.wait()
-            self.assertTrue(isinstance(server.error, ZeroDivisionError))
-        finally:
-            server.close()
-            client.close()
-
     @unittest.skipIf(mock is None, 'mock package not present')
     @gen_test
     def test_read_until_close_with_error(self):
@@ -1228,27 +975,6 @@ class WaitForHandshakeTest(AsyncTestCase):
             if client is not None:
                 client.close()
 
-    @gen_test
-    def test_wait_for_handshake_callback(self):
-        test = self
-        handshake_future = Future()
-
-        class TestServer(TCPServer):
-            def handle_stream(self, stream, address):
-                # The handshake has not yet completed.
-                test.assertIsNone(stream.socket.cipher())
-                self.stream = stream
-                with ignore_deprecation():
-                    stream.wait_for_handshake(self.handshake_done)
-
-            def handshake_done(self):
-                # Now the handshake is done and ssl information is available.
-                test.assertIsNotNone(self.stream.socket.cipher())
-                handshake_future.set_result(None)
-
-        yield self.connect_to_server(TestServer)
-        yield handshake_future
-
     @gen_test
     def test_wait_for_handshake_future(self):
         test = self