]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Further refactoring of duplicated IOStream logic
authorBen Darnell <ben@bendarnell.com>
Mon, 20 Feb 2012 03:50:01 +0000 (19:50 -0800)
committerBen Darnell <ben@bendarnell.com>
Mon, 20 Feb 2012 03:50:01 +0000 (19:50 -0800)
tornado/iostream.py

index a0d8399a309e7651ab7fe3ce380d41c72a909325..51410c49671156d32d8e7f17bf9e564172c23e89 100644 (file)
@@ -137,15 +137,15 @@ class IOStream(object):
 
     def read_until_regex(self, regex, callback):
         """Call callback when we read the given regex pattern."""
-        assert not self._read_callback, "Already reading"
+        self._set_read_callback(callback)
         self._read_regex = re.compile(regex)
-        self._read_until(callback)
+        self._try_inline_read()
 
     def read_until(self, delimiter, callback):
         """Call callback when we read the given delimiter."""
-        assert not self._read_callback, "Already reading"
+        self._set_read_callback(callback)
         self._read_delimiter = delimiter
-        self._read_until(callback)
+        self._try_inline_read()
 
     def read_bytes(self, num_bytes, callback, streaming_callback=None):
         """Call callback when we read the given number of bytes.
@@ -154,11 +154,11 @@ class IOStream(object):
         of data as they become available, and the argument to the final
         ``callback`` will be empty.
         """
-        assert not self._read_callback, "Already reading"
+        self._set_read_callback(callback)
         assert isinstance(num_bytes, (int, long))
         self._read_bytes = num_bytes
         self._streaming_callback = stack_context.wrap(streaming_callback)
-        self._read_until(callback)
+        self._try_inline_read()
 
     def read_until_close(self, callback, streaming_callback=None):
         """Reads all data from the socket until it is closed.
@@ -170,12 +170,12 @@ class IOStream(object):
         Subject to ``max_buffer_size`` limit from `IOStream` constructor if
         a ``streaming_callback`` is not used.
         """
-        assert not self._read_callback, "Already reading"
+        self._set_read_callback(callback)
         if self.closed():
             self._run_callback(callback, self._consume(self._read_buffer_size))
+            self._read_callback = None
             return
         self._read_until_close = True
-        self._read_callback = stack_context.wrap(callback)
         self._streaming_callback = stack_context.wrap(streaming_callback)
         self._add_io_state(self.io_loop.READ)
 
@@ -327,11 +327,17 @@ class IOStream(object):
                 if self._read_from_buffer():
                     return
 
-    def _read_until(self, callback):
-        """Assign given read callback and initiate read to buffer
-        unless stream has already been read or closed.
+    def _set_read_callback(self, callback):
+        assert not self._read_callback, "Already reading"
+        self._read_callback = callback
+
+    def _try_inline_read(self):
+        """Attempt to complete the current read operation from buffered data.
+
+        If the read can be completed without blocking, schedules the
+        read callback on the next IOLoop iteration; otherwise starts
+        listening for reads on the socket.
         """
-        self._read_callback = stack_context.wrap(callback)
         while True:
             # See if we've already got the data from a previous read
             if self._read_from_buffer():