From: Ben Darnell Date: Mon, 14 Apr 2014 01:25:10 +0000 (-0400) Subject: Add a `streaming` option to _run_read_callback and move all calls to _consume X-Git-Tag: v4.0.0b1~91^2~26 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=243f28272274060922bfb2617a588d5b62b65cd2;p=thirdparty%2Ftornado.git Add a `streaming` option to _run_read_callback and move all calls to _consume into that method. --- diff --git a/tornado/iostream.py b/tornado/iostream.py index 601d49512..5d7fb018b 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -236,9 +236,8 @@ class BaseIOStream(object): self._streaming_callback = stack_context.wrap(streaming_callback) if self.closed(): if self._streaming_callback is not None: - self._run_callback(self._streaming_callback, - self._consume(self._read_buffer_size)) - self._run_read_callback(self._consume(self._read_buffer_size)) + self._run_read_callback(self._read_buffer_size, True) + self._run_read_callback(self._read_buffer_size, False) return future self._read_until_close = True self._streaming_callback = stack_context.wrap(streaming_callback) @@ -296,10 +295,9 @@ class BaseIOStream(object): if self._read_until_close: if (self._streaming_callback is not None and self._read_buffer_size): - self._run_callback(self._streaming_callback, - self._consume(self._read_buffer_size)) + self._run_read_callback(self._read_buffer_size, True) self._read_until_close = False - self._run_read_callback(self._consume(self._read_buffer_size)) + self._run_read_callback(self._read_buffer_size, False) if self._state is not None: self.io_loop.remove_handler(self.fileno()) self._state = None @@ -484,16 +482,20 @@ class BaseIOStream(object): self._read_future = TracebackFuture() return self._read_future - def _run_read_callback(self, data): - self._streaming_callback = None + def _run_read_callback(self, size, streaming): + if streaming: + callback = self._streaming_callback + else: + callback = self._read_callback + self._read_callback = self._streaming_callback = None if self._read_future is not None: + assert callback is None future = self._read_future self._read_future = None - future.set_result(data) - if self._read_callback is not None: - callback = self._read_callback - self._read_callback = None - self._run_callback(callback, data) + future.set_result(self._consume(size)) + if callback is not None: + assert self._read_future is None + self._run_callback(callback, self._consume(size)) else: # If we scheduled a callback, we will add the error listener # afterwards. If we didn't, we have to do it now. @@ -562,15 +564,14 @@ class BaseIOStream(object): if self._read_bytes is not None: bytes_to_consume = min(self._read_bytes, bytes_to_consume) self._read_bytes -= bytes_to_consume - self._run_callback(self._streaming_callback, - self._consume(bytes_to_consume)) + self._run_read_callback(bytes_to_consume, True) if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))): num_bytes = min(self._read_bytes, self._read_buffer_size) self._read_bytes = None self._read_partial = False - self._run_read_callback(self._consume(num_bytes)) + self._run_read_callback(num_bytes, False) return True elif self._read_delimiter is not None: # Multi-byte delimiters (e.g. '\r\n') may straddle two @@ -580,7 +581,7 @@ class BaseIOStream(object): # length) tend to be "line" oriented, the delimiter is likely # to be in the first few chunks. Merge the buffer gradually # since large merges are relatively expensive and get undone in - # consume(). + # _consume(). if self._read_buffer: while True: loc = self._read_buffer[0].find(self._read_delimiter) @@ -589,8 +590,7 @@ class BaseIOStream(object): self._check_max_bytes(self._read_delimiter, loc + delimiter_len) self._read_delimiter = None - self._run_read_callback( - self._consume(loc + delimiter_len)) + self._run_read_callback(loc + delimiter_len, False) return True if len(self._read_buffer) == 1: break @@ -604,7 +604,7 @@ class BaseIOStream(object): if m is not None: self._check_max_bytes(self._read_regex, m.end()) self._read_regex = None - self._run_read_callback(self._consume(m.end())) + self._run_read_callback(m.end(), False) return True if len(self._read_buffer) == 1: break