]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a `streaming` option to _run_read_callback and move all calls to _consume
authorBen Darnell <ben@bendarnell.com>
Mon, 14 Apr 2014 01:25:10 +0000 (21:25 -0400)
committerBen Darnell <ben@bendarnell.com>
Mon, 14 Apr 2014 01:25:10 +0000 (21:25 -0400)
into that method.

tornado/iostream.py

index 601d49512cca889420c99649e5304a89d1d70f9b..5d7fb018b3c8a6b548fc381fa845d95190849765 100644 (file)
@@ -236,9 +236,8 @@ class BaseIOStream(object):
         self._streaming_callback = stack_context.wrap(streaming_callback)
         if self.closed():
             if self._streaming_callback is not None:
-                self._run_callback(self._streaming_callback,
-                                   self._consume(self._read_buffer_size))
-            self._run_read_callback(self._consume(self._read_buffer_size))
+                self._run_read_callback(self._read_buffer_size, True)
+            self._run_read_callback(self._read_buffer_size, False)
             return future
         self._read_until_close = True
         self._streaming_callback = stack_context.wrap(streaming_callback)
@@ -296,10 +295,9 @@ class BaseIOStream(object):
             if self._read_until_close:
                 if (self._streaming_callback is not None and
                         self._read_buffer_size):
-                    self._run_callback(self._streaming_callback,
-                                       self._consume(self._read_buffer_size))
+                    self._run_read_callback(self._read_buffer_size, True)
                 self._read_until_close = False
-                self._run_read_callback(self._consume(self._read_buffer_size))
+                self._run_read_callback(self._read_buffer_size, False)
             if self._state is not None:
                 self.io_loop.remove_handler(self.fileno())
                 self._state = None
@@ -484,16 +482,20 @@ class BaseIOStream(object):
             self._read_future = TracebackFuture()
         return self._read_future
 
-    def _run_read_callback(self, data):
-        self._streaming_callback = None
+    def _run_read_callback(self, size, streaming):
+        if streaming:
+            callback = self._streaming_callback
+        else:
+            callback = self._read_callback
+            self._read_callback = self._streaming_callback = None
         if self._read_future is not None:
+            assert callback is None
             future = self._read_future
             self._read_future = None
-            future.set_result(data)
-        if self._read_callback is not None:
-            callback = self._read_callback
-            self._read_callback = None
-            self._run_callback(callback, data)
+            future.set_result(self._consume(size))
+        if callback is not None:
+            assert self._read_future is None
+            self._run_callback(callback, self._consume(size))
         else:
             # If we scheduled a callback, we will add the error listener
             # afterwards.  If we didn't, we have to do it now.
@@ -562,15 +564,14 @@ class BaseIOStream(object):
             if self._read_bytes is not None:
                 bytes_to_consume = min(self._read_bytes, bytes_to_consume)
                 self._read_bytes -= bytes_to_consume
-            self._run_callback(self._streaming_callback,
-                               self._consume(bytes_to_consume))
+            self._run_read_callback(bytes_to_consume, True)
         if (self._read_bytes is not None and
             (self._read_buffer_size >= self._read_bytes or
              (self._read_partial and self._read_buffer_size > 0))):
             num_bytes = min(self._read_bytes, self._read_buffer_size)
             self._read_bytes = None
             self._read_partial = False
-            self._run_read_callback(self._consume(num_bytes))
+            self._run_read_callback(num_bytes, False)
             return True
         elif self._read_delimiter is not None:
             # Multi-byte delimiters (e.g. '\r\n') may straddle two
@@ -580,7 +581,7 @@ class BaseIOStream(object):
             # length) tend to be "line" oriented, the delimiter is likely
             # to be in the first few chunks.  Merge the buffer gradually
             # since large merges are relatively expensive and get undone in
-            # consume().
+            # _consume().
             if self._read_buffer:
                 while True:
                     loc = self._read_buffer[0].find(self._read_delimiter)
@@ -589,8 +590,7 @@ class BaseIOStream(object):
                         self._check_max_bytes(self._read_delimiter,
                                               loc + delimiter_len)
                         self._read_delimiter = None
-                        self._run_read_callback(
-                            self._consume(loc + delimiter_len))
+                        self._run_read_callback(loc + delimiter_len, False)
                         return True
                     if len(self._read_buffer) == 1:
                         break
@@ -604,7 +604,7 @@ class BaseIOStream(object):
                     if m is not None:
                         self._check_max_bytes(self._read_regex, m.end())
                         self._read_regex = None
-                        self._run_read_callback(self._consume(m.end()))
+                        self._run_read_callback(m.end(), False)
                         return True
                     if len(self._read_buffer) == 1:
                         break