% self._expected_content_remaining
)
if self._chunking_output:
- if not self.stream.closed():
+ assert self._request_start_line
+ if not self.stream.closed() and (
+ self.is_client or self._request_start_line.method != "HEAD"
+ ):
self._pending_write = self.stream.write(b"0\r\n\r\n")
self._pending_write.add_done_callback(self._on_write_complete)
self._write_finished = True
# be written out in chunks.
self.write("".join(chr(i % 256) * 1024 for i in range(512)))
+ class TransferEncodingChunkedHandler(RequestHandler):
+ @gen.coroutine
+ def head(self):
+ self.write("Hello world")
+ yield self.flush()
+
class FinishOnCloseHandler(RequestHandler):
def initialize(self, cleanup_event):
self.cleanup_event = cleanup_event
[
("/", HelloHandler),
("/large", LargeHandler),
+ ("/chunked", TransferEncodingChunkedHandler),
(
"/finish_on_close",
FinishOnCloseHandler,
self.assertEqual(self.headers["Connection"], "Keep-Alive")
self.close()
+ @gen_test
+ def test_keepalive_chunked_head_no_body(self):
+ yield self.connect()
+ self.stream.write(b"HEAD /chunked HTTP/1.1\r\n\r\n")
+ yield self.read_headers()
+
+ self.stream.write(b"HEAD /chunked HTTP/1.1\r\n\r\n")
+ yield self.read_headers()
+ self.close()
+
class GzipBaseTest(object):
def get_app(self):