self.stream.close()
return
if self._decompressor:
- data = self._decompressor(data)
+ data = (self._decompressor.decompress(data) +
+ self._decompressor.flush())
if self.request.streaming_callback:
if self.chunks is None:
# if chunks is not None, we already called streaming_callback
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
length = int(data.strip(), 16)
if length == 0:
- # all the data has been decompressed, so we don't need to
- # decompress again in _on_body
- self._decompressor = None
+ if self._decompressor is not None:
+ tail = self._decompressor.flush()
+ if tail:
+ # I believe the tail will always be empty (i.e.
+ # decompress will return all it can). The purpose
+ # of the flush call is to detect errors such
+ # as truncated input. But in case it ever returns
+ # anything, treat it as an extra chunk
+ if self.request.streaming_callback is not None:
+ self.request.streaming_callback(tail)
+ else:
+ self.chunks.append(tail)
+ # all the data has been decompressed, so we don't need to
+ # decompress again in _on_body
+ self._decompressor = None
self._on_body(b('').join(self.chunks))
else:
self.stream.read_bytes(length + 2, # chunk ends with \r\n
assert data[-2:] == b("\r\n")
chunk = data[:-2]
if self._decompressor:
- chunk = self._decompressor(chunk)
+ chunk = self._decompressor.decompress(chunk)
if self.request.streaming_callback is not None:
self.request.streaming_callback(chunk)
else:
class GzipDecompressor(object):
+ """Streaming gzip decompressor.
+
+ The interface is like that of `zlib.decompressobj` (without the
+ optional arguments, but it understands gzip headers and checksums.
+ """
def __init__(self):
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
+ # This works on cpython and pypy, but not jython.
self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS)
- def __call__(self, value):
+ def decompress(self, value):
+ """Decompress a chunk, returning newly-available data.
+
+ Some data may be buffered for later processing; `flush` must
+ be called when there is no more input data to ensure that
+ all data was processed.
+ """
return self.decompressobj.decompress(value)
+ def flush(self):
+ """Return any remaining buffered data not yet returned by decompress.
+
+ Also checks for errors such as truncated input.
+ No other methods may be called on this object after `flush`.
+ """
+ return self.decompressobj.flush()
+
def import_object(name):
"""Imports an object by name.