]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Bring Future support for IOStream read operations into IOStream itself.
authorBen Darnell <ben@bendarnell.com>
Mon, 17 Feb 2014 02:39:19 +0000 (21:39 -0500)
committerBen Darnell <ben@bendarnell.com>
Mon, 17 Feb 2014 02:49:22 +0000 (21:49 -0500)
This speeds things up in comparison to the _iostream_return_future
decorator by removing unnecessary abstraction and especially by allowing
Futures to bypass some of the hoops IOStream jumps through to ensure a
clean slate for its callbacks.

tornado/iostream.py

index 6baea4a1364b06131afae529ddd54eda075bcf9f..11092adb7dff0c8c51af56a062d3627e10354e23 100644 (file)
@@ -127,6 +127,7 @@ class BaseIOStream(object):
         self._read_bytes = None
         self._read_until_close = False
         self._read_callback = None
+        self._read_future = None
         self._streaming_callback = None
         self._write_callback = None
         self._close_callback = None
@@ -176,30 +177,29 @@ class BaseIOStream(object):
         """
         return None
 
-    @_iostream_return_future
-    def read_until_regex(self, regex, callback):
+    def read_until_regex(self, regex, callback=None):
         """Run ``callback`` when we read the given regex pattern.
 
         The callback will get the data read (including the data that
         matched the regex and anything that came before it) as an argument.
         """
-        self._set_read_callback(callback)
+        future = self._set_read_callback(callback)
         self._read_regex = re.compile(regex)
         self._try_inline_read()
+        return future
 
-    @_iostream_return_future
-    def read_until(self, delimiter, callback):
+    def read_until(self, delimiter, callback=None):
         """Run ``callback`` when we read the given delimiter.
 
         The callback will get the data read (including the delimiter)
         as an argument.
         """
-        self._set_read_callback(callback)
+        future = self._set_read_callback(callback)
         self._read_delimiter = delimiter
         self._try_inline_read()
+        return future
 
-    @_iostream_return_future
-    def read_bytes(self, num_bytes, callback, streaming_callback=None):
+    def read_bytes(self, num_bytes, callback=None, streaming_callback=None):
         """Run callback when we read the given number of bytes.
 
         If a ``streaming_callback`` is given, it will be called with chunks
@@ -207,14 +207,14 @@ class BaseIOStream(object):
         ``callback`` will be empty.  Otherwise, the ``callback`` gets
         the data as an argument.
         """
-        self._set_read_callback(callback)
+        future = self._set_read_callback(callback)
         assert isinstance(num_bytes, numbers.Integral)
         self._read_bytes = num_bytes
         self._streaming_callback = stack_context.wrap(streaming_callback)
         self._try_inline_read()
+        return future
 
-    @_iostream_return_future
-    def read_until_close(self, callback, streaming_callback=None):
+    def read_until_close(self, callback=None, streaming_callback=None):
         """Reads all data from the socket until it is closed.
 
         If a ``streaming_callback`` is given, it will be called with chunks
@@ -225,20 +225,18 @@ class BaseIOStream(object):
         Subject to ``max_buffer_size`` limit from `IOStream` constructor if
         a ``streaming_callback`` is not used.
         """
-        self._set_read_callback(callback)
+        future = self._set_read_callback(callback)
         self._streaming_callback = stack_context.wrap(streaming_callback)
         if self.closed():
             if self._streaming_callback is not None:
                 self._run_callback(self._streaming_callback,
                                    self._consume(self._read_buffer_size))
-            self._run_callback(self._read_callback,
-                               self._consume(self._read_buffer_size))
-            self._streaming_callback = None
-            self._read_callback = None
-            return
+            self._run_read_callback(self._consume(self._read_buffer_size))
+            return future
         self._read_until_close = True
         self._streaming_callback = stack_context.wrap(streaming_callback)
         self._try_inline_read()
+        return future
 
     @_iostream_return_future
     def write(self, data, callback=None):
@@ -289,11 +287,8 @@ class BaseIOStream(object):
                         self._read_buffer_size):
                     self._run_callback(self._streaming_callback,
                                        self._consume(self._read_buffer_size))
-                callback = self._read_callback
-                self._read_callback = None
                 self._read_until_close = False
-                self._run_callback(callback,
-                                   self._consume(self._read_buffer_size))
+                self._run_read_callback(self._consume(self._read_buffer_size))
             if self._state is not None:
                 self.io_loop.remove_handler(self.fileno())
                 self._state = None
@@ -309,6 +304,9 @@ class BaseIOStream(object):
             # from the set as it is closed.
             for fut in list(self._pending_futures):
                 fut.set_exception(StreamClosedError())
+            if self._read_future is not None:
+                self._read_future.set_exception(StreamClosedError())
+                self._read_future = None
             if self._close_callback is not None:
                 cb = self._close_callback
                 self._close_callback = None
@@ -455,8 +453,24 @@ class BaseIOStream(object):
             self._maybe_run_close_callback()
 
     def _set_read_callback(self, callback):
-        assert not self._read_callback, "Already reading"
-        self._read_callback = stack_context.wrap(callback)
+        assert self._read_callback is None, "Already reading"
+        assert self._read_future is None, "Already reading"
+        if callback is not None:
+            self._read_callback = stack_context.wrap(callback)
+        else:
+            self._read_future = TracebackFuture()
+        return self._read_future
+
+    def _run_read_callback(self, data):
+        self._streaming_callback = None
+        if self._read_future is not None:
+            self._read_future.set_result(data)
+            self._read_future = None
+        if self._read_callback is not None:
+            callback = self._read_callback
+            self._read_callback = None
+            self._run_callback(callback, data)
+
 
     def _try_inline_read(self):
         """Attempt to complete the current read operation from buffered data.
@@ -532,11 +546,8 @@ class BaseIOStream(object):
                                self._consume(bytes_to_consume))
         if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes:
             num_bytes = self._read_bytes
-            callback = self._read_callback
-            self._read_callback = None
-            self._streaming_callback = None
             self._read_bytes = None
-            self._run_callback(callback, self._consume(num_bytes))
+            self._run_read_callback(self._consume(num_bytes))
             return True
         elif self._read_delimiter is not None:
             # Multi-byte delimiters (e.g. '\r\n') may straddle two
@@ -551,13 +562,10 @@ class BaseIOStream(object):
                 while True:
                     loc = self._read_buffer[0].find(self._read_delimiter)
                     if loc != -1:
-                        callback = self._read_callback
                         delimiter_len = len(self._read_delimiter)
-                        self._read_callback = None
-                        self._streaming_callback = None
                         self._read_delimiter = None
-                        self._run_callback(callback,
-                                           self._consume(loc + delimiter_len))
+                        self._run_read_callback(
+                            self._consume(loc + delimiter_len))
                         return True
                     if len(self._read_buffer) == 1:
                         break
@@ -567,11 +575,8 @@ class BaseIOStream(object):
                 while True:
                     m = self._read_regex.search(self._read_buffer[0])
                     if m is not None:
-                        callback = self._read_callback
-                        self._read_callback = None
-                        self._streaming_callback = None
                         self._read_regex = None
-                        self._run_callback(callback, self._consume(m.end()))
+                        self._run_read_callback(self._consume(m.end()))
                         return True
                     if len(self._read_buffer) == 1:
                         break