From: Ben Darnell Date: Mon, 17 Feb 2014 02:39:19 +0000 (-0500) Subject: Bring Future support for IOStream read operations into IOStream itself. X-Git-Tag: v4.0.0b1~123 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1653350825a689033026066b7f405a650de8e79d;p=thirdparty%2Ftornado.git Bring Future support for IOStream read operations into IOStream itself. This speeds things up in comparison to the _iostream_return_future decorator by removing unnecessary abstraction and especially by allowing Futures to bypass some of the hoops IOStream jumps through to ensure a clean slate for its callbacks. --- diff --git a/tornado/iostream.py b/tornado/iostream.py index 6baea4a13..11092adb7 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -127,6 +127,7 @@ class BaseIOStream(object): self._read_bytes = None self._read_until_close = False self._read_callback = None + self._read_future = None self._streaming_callback = None self._write_callback = None self._close_callback = None @@ -176,30 +177,29 @@ class BaseIOStream(object): """ return None - @_iostream_return_future - def read_until_regex(self, regex, callback): + def read_until_regex(self, regex, callback=None): """Run ``callback`` when we read the given regex pattern. The callback will get the data read (including the data that matched the regex and anything that came before it) as an argument. """ - self._set_read_callback(callback) + future = self._set_read_callback(callback) self._read_regex = re.compile(regex) self._try_inline_read() + return future - @_iostream_return_future - def read_until(self, delimiter, callback): + def read_until(self, delimiter, callback=None): """Run ``callback`` when we read the given delimiter. The callback will get the data read (including the delimiter) as an argument. """ - self._set_read_callback(callback) + future = self._set_read_callback(callback) self._read_delimiter = delimiter self._try_inline_read() + return future - @_iostream_return_future - def read_bytes(self, num_bytes, callback, streaming_callback=None): + def read_bytes(self, num_bytes, callback=None, streaming_callback=None): """Run callback when we read the given number of bytes. If a ``streaming_callback`` is given, it will be called with chunks @@ -207,14 +207,14 @@ class BaseIOStream(object): ``callback`` will be empty. Otherwise, the ``callback`` gets the data as an argument. """ - self._set_read_callback(callback) + future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._streaming_callback = stack_context.wrap(streaming_callback) self._try_inline_read() + return future - @_iostream_return_future - def read_until_close(self, callback, streaming_callback=None): + def read_until_close(self, callback=None, streaming_callback=None): """Reads all data from the socket until it is closed. If a ``streaming_callback`` is given, it will be called with chunks @@ -225,20 +225,18 @@ class BaseIOStream(object): Subject to ``max_buffer_size`` limit from `IOStream` constructor if a ``streaming_callback`` is not used. """ - self._set_read_callback(callback) + future = self._set_read_callback(callback) self._streaming_callback = stack_context.wrap(streaming_callback) if self.closed(): if self._streaming_callback is not None: self._run_callback(self._streaming_callback, self._consume(self._read_buffer_size)) - self._run_callback(self._read_callback, - self._consume(self._read_buffer_size)) - self._streaming_callback = None - self._read_callback = None - return + self._run_read_callback(self._consume(self._read_buffer_size)) + return future self._read_until_close = True self._streaming_callback = stack_context.wrap(streaming_callback) self._try_inline_read() + return future @_iostream_return_future def write(self, data, callback=None): @@ -289,11 +287,8 @@ class BaseIOStream(object): self._read_buffer_size): self._run_callback(self._streaming_callback, self._consume(self._read_buffer_size)) - callback = self._read_callback - self._read_callback = None self._read_until_close = False - self._run_callback(callback, - self._consume(self._read_buffer_size)) + self._run_read_callback(self._consume(self._read_buffer_size)) if self._state is not None: self.io_loop.remove_handler(self.fileno()) self._state = None @@ -309,6 +304,9 @@ class BaseIOStream(object): # from the set as it is closed. for fut in list(self._pending_futures): fut.set_exception(StreamClosedError()) + if self._read_future is not None: + self._read_future.set_exception(StreamClosedError()) + self._read_future = None if self._close_callback is not None: cb = self._close_callback self._close_callback = None @@ -455,8 +453,24 @@ class BaseIOStream(object): self._maybe_run_close_callback() def _set_read_callback(self, callback): - assert not self._read_callback, "Already reading" - self._read_callback = stack_context.wrap(callback) + assert self._read_callback is None, "Already reading" + assert self._read_future is None, "Already reading" + if callback is not None: + self._read_callback = stack_context.wrap(callback) + else: + self._read_future = TracebackFuture() + return self._read_future + + def _run_read_callback(self, data): + self._streaming_callback = None + if self._read_future is not None: + self._read_future.set_result(data) + self._read_future = None + if self._read_callback is not None: + callback = self._read_callback + self._read_callback = None + self._run_callback(callback, data) + def _try_inline_read(self): """Attempt to complete the current read operation from buffered data. @@ -532,11 +546,8 @@ class BaseIOStream(object): self._consume(bytes_to_consume)) if self._read_bytes is not None and self._read_buffer_size >= self._read_bytes: num_bytes = self._read_bytes - callback = self._read_callback - self._read_callback = None - self._streaming_callback = None self._read_bytes = None - self._run_callback(callback, self._consume(num_bytes)) + self._run_read_callback(self._consume(num_bytes)) return True elif self._read_delimiter is not None: # Multi-byte delimiters (e.g. '\r\n') may straddle two @@ -551,13 +562,10 @@ class BaseIOStream(object): while True: loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: - callback = self._read_callback delimiter_len = len(self._read_delimiter) - self._read_callback = None - self._streaming_callback = None self._read_delimiter = None - self._run_callback(callback, - self._consume(loc + delimiter_len)) + self._run_read_callback( + self._consume(loc + delimiter_len)) return True if len(self._read_buffer) == 1: break @@ -567,11 +575,8 @@ class BaseIOStream(object): while True: m = self._read_regex.search(self._read_buffer[0]) if m is not None: - callback = self._read_callback - self._read_callback = None - self._streaming_callback = None self._read_regex = None - self._run_callback(callback, self._consume(m.end())) + self._run_read_callback(self._consume(m.end())) return True if len(self._read_buffer) == 1: break