.. versionadded:: 4.0
"""
+import asyncio
import logging
import re
import types
from tornado.util import GzipDecompressor
-from typing import (
- cast,
- Optional,
- Type,
- Awaitable,
- Generator,
- Any,
- Callable,
- Union,
- Tuple,
-)
+from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple
class _QuietException(Exception):
delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
return self._read_message(delegate)
- @gen.coroutine
- def _read_message(
- self, delegate: httputil.HTTPMessageDelegate
- ) -> Generator[Any, Any, bool]:
+ async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool:
need_delegate_close = False
try:
header_future = self.stream.read_until_regex(
b"\r?\n\r?\n", max_bytes=self.params.max_header_size
)
if self.params.header_timeout is None:
- header_data = yield header_future
+ header_data = await header_future
else:
try:
- header_data = yield gen.with_timeout(
+ header_data = await gen.with_timeout(
self.stream.io_loop.time() + self.params.header_timeout,
header_future,
quiet_exceptions=iostream.StreamClosedError,
with _ExceptionLoggingContext(app_log):
header_recv_future = delegate.headers_received(start_line, headers)
if header_recv_future is not None:
- yield header_recv_future
+ await header_recv_future
if self.stream is None:
# We've been detached.
need_delegate_close = False
)
# TODO: client delegates will get headers_received twice
# in the case of a 100-continue. Document or change?
- yield self._read_message(delegate)
+ await self._read_message(delegate)
else:
if headers.get("Expect") == "100-continue" and not self._write_finished:
self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
)
if body_future is not None:
if self._body_timeout is None:
- yield body_future
+ await body_future
else:
try:
- yield gen.with_timeout(
+ await gen.with_timeout(
self.stream.io_loop.time() + self._body_timeout,
body_future,
quiet_exceptions=iostream.StreamClosedError,
and not self.stream.closed()
):
self.stream.set_close_callback(self._on_connection_close)
- yield self._finish_future
+ await self._finish_future
if self.is_client and self._disconnect_on_finish:
self.close()
if self.stream is None:
except httputil.HTTPInputError as e:
gen_log.info("Malformed HTTP message from %s: %s", self.context, e)
if not self.is_client:
- yield self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n")
+ await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n")
self.close()
return False
finally:
return self._read_body_until_close(delegate)
return None
- @gen.coroutine
- def _read_fixed_body(
+ async def _read_fixed_body(
self, content_length: int, delegate: httputil.HTTPMessageDelegate
- ) -> Generator[Any, Any, None]:
+ ) -> None:
while content_length > 0:
- body = yield self.stream.read_bytes(
+ body = await self.stream.read_bytes(
min(self.params.chunk_size, content_length), partial=True
)
content_length -= len(body)
with _ExceptionLoggingContext(app_log):
ret = delegate.data_received(body)
if ret is not None:
- yield ret
+ await ret
- @gen.coroutine
- def _read_chunked_body(
- self, delegate: httputil.HTTPMessageDelegate
- ) -> Generator[Any, Any, None]:
+ async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None:
# TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
total_size = 0
while True:
- chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)
- chunk_len = int(chunk_len.strip(), 16)
+ chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64)
+ chunk_len = int(chunk_len_str.strip(), 16)
if chunk_len == 0:
- crlf = yield self.stream.read_bytes(2)
+ crlf = await self.stream.read_bytes(2)
if crlf != b"\r\n":
raise httputil.HTTPInputError(
"improperly terminated chunked request"
raise httputil.HTTPInputError("chunked body too large")
bytes_to_read = chunk_len
while bytes_to_read:
- chunk = yield self.stream.read_bytes(
+ chunk = await self.stream.read_bytes(
min(bytes_to_read, self.params.chunk_size), partial=True
)
bytes_to_read -= len(chunk)
with _ExceptionLoggingContext(app_log):
ret = delegate.data_received(chunk)
if ret is not None:
- yield ret
+ await ret
# chunk ends with \r\n
- crlf = yield self.stream.read_bytes(2)
+ crlf = await self.stream.read_bytes(2)
assert crlf == b"\r\n"
- @gen.coroutine
- def _read_body_until_close(
+ async def _read_body_until_close(
self, delegate: httputil.HTTPMessageDelegate
- ) -> Generator[Any, Any, None]:
- body = yield self.stream.read_until_close()
+ ) -> None:
+ body = await self.stream.read_until_close()
if not self._write_finished or self.is_client:
with _ExceptionLoggingContext(app_log):
- delegate.data_received(body)
+ ret = delegate.data_received(body)
+ if ret is not None:
+ await ret
class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
del headers["Content-Encoding"]
return self._delegate.headers_received(start_line, headers)
- @gen.coroutine
- def data_received(self, chunk: bytes) -> Generator[Any, Any, None]:
+ async def data_received(self, chunk: bytes) -> None:
if self._decompressor:
compressed_data = chunk
while compressed_data:
if decompressed:
ret = self._delegate.data_received(decompressed)
if ret is not None:
- yield ret
+ await ret
compressed_data = self._decompressor.unconsumed_tail
else:
ret = self._delegate.data_received(chunk)
if ret is not None:
- yield ret
+ await ret
def finish(self) -> None:
if self._decompressor is not None:
tail = self._decompressor.flush()
if tail:
- # I believe the tail will always be empty (i.e.
- # decompress will return all it can). The purpose
- # of the flush call is to detect errors such
- # as truncated input. But in case it ever returns
- # anything, treat it as an extra chunk
- self._delegate.data_received(tail)
+ # The tail should always be empty: decompress returned
+ # all that it can in data_received and the only
+ # purpose of the flush call is to detect errors such
+ # as truncated input. If we did legitimately get a new
+ # chunk at this point we'd need to change the
+ # interface to make finish() a coroutine.
+ raise ValueError(
+ "decompressor.flush returned data; possile truncated input"
+ )
return self._delegate.finish()
def on_connection_close(self) -> None:
self.context = context
self._serving_future = None # type: Optional[Future[None]]
- @gen.coroutine
- def close(self) -> Generator[Any, Any, None]:
+ async def close(self) -> None:
"""Closes the connection.
Returns a `.Future` that resolves after the serving loop has exited.
self.stream.close()
# Block until the serving loop is done, but ignore any exceptions
# (start_serving is already responsible for logging them).
+ assert self._serving_future is not None
try:
- yield self._serving_future
+ await self._serving_future
except Exception:
pass
:arg delegate: a `.HTTPServerConnectionDelegate`
"""
assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
- fut = self._server_request_loop(delegate)
+ fut = gen.convert_yielded(self._server_request_loop(delegate))
self._serving_future = fut
# Register the future on the IOLoop so its errors get logged.
self.stream.io_loop.add_future(fut, lambda f: f.result())
- @gen.coroutine
- def _server_request_loop(
+ async def _server_request_loop(
self, delegate: httputil.HTTPServerConnectionDelegate
- ) -> Generator[Any, Any, None]:
+ ) -> None:
try:
while True:
conn = HTTP1Connection(self.stream, False, self.params, self.context)
request_delegate = delegate.start_request(self, conn)
try:
- ret = yield conn.read_response(request_delegate)
+ ret = await conn.read_response(request_delegate)
except (iostream.StreamClosedError, iostream.UnsatisfiableReadError):
return
except _QuietException:
return
if not ret:
return
- yield gen.moment
+ await asyncio.sleep(0)
finally:
delegate.on_close(self)
from tornado import gen, netutil
-from tornado.concurrent import Future
from tornado.escape import (
json_decode,
json_encode,
from typing import Dict, List # noqa: F401
-def read_stream_body(stream, callback):
- """Reads an HTTP response from `stream` and runs callback with its
+async def read_stream_body(stream):
+ """Reads an HTTP response from `stream` and returns a tuple of its
start_line, headers and body."""
chunks = []
def finish(self):
conn.detach() # type: ignore
- callback((self.start_line, self.headers, b"".join(chunks)))
conn = HTTP1Connection(stream, True)
- conn.read_response(Delegate())
+ delegate = Delegate()
+ await conn.read_response(delegate)
+ return delegate.start_line, delegate.headers, b"".join(chunks)
class HandlerBaseTestCase(AsyncHTTPTestCase):
+ newline
+ body
)
- read_stream_body(stream, self.stop)
- start_line, headers, body = self.wait()
+ start_line, headers, body = self.io_loop.run_sync(
+ lambda: read_stream_body(stream)
+ )
return body
def test_multipart_form(self):
def test_malformed_first_line_response(self):
with ExpectLog(gen_log, ".*Malformed HTTP request line"):
self.stream.write(b"asdf\r\n\r\n")
- read_stream_body(self.stream, self.stop)
- start_line, headers, response = self.wait()
+ start_line, headers, response = self.io_loop.run_sync(
+ lambda: read_stream_body(self.stream)
+ )
self.assertEqual("HTTP/1.1", start_line.version)
self.assertEqual(400, start_line.code)
self.assertEqual("Bad Request", start_line.reason)
b"\n", b"\r\n"
)
)
- read_stream_body(self.stream, self.stop)
- start_line, headers, response = self.wait()
+ start_line, headers, response = self.io_loop.run_sync(
+ lambda: read_stream_body(self.stream)
+ )
self.assertEqual(json_decode(response), {u"foo": [u"bar"]})
def test_chunked_request_uppercase(self):
b"\n", b"\r\n"
)
)
- read_stream_body(self.stream, self.stop)
- start_line, headers, response = self.wait()
+ start_line, headers, response = self.io_loop.run_sync(
+ lambda: read_stream_body(self.stream)
+ )
self.assertEqual(json_decode(response), {u"foo": [u"bar"]})
@gen_test
b"Content-Length: 10240\r\n\r\n"
)
stream.write(b"a" * 10240)
- fut = Future() # type: Future[bytes]
- read_stream_body(stream, callback=fut.set_result)
- start_line, headers, response = yield fut
+ start_line, headers, response = yield read_stream_body(stream)
self.assertEqual(response, b"10240")
# Without the ?expected_size parameter, we get the old default value
stream.write(