From: Tom Christie Date: Tue, 20 Aug 2019 10:37:37 +0000 (+0100) Subject: Simplify ASGI concurrency (#248) X-Git-Tag: 0.7.2~24 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2ed70ebc2a93b4e6c0f8cab0cd94241ace5ba0b5;p=thirdparty%2Fhttpx.git Simplify ASGI concurrency (#248) * Drop keyword argument * Improve docstrings for WSGIDispatch and ASGIDispatch * Add docs for fine grained WSGI/ASGI control * Simplify concurrency handling in ASGIDispatch * Variable renaming * Fix missing nonlocal declaration * Split nonlocal onto multiple lines --- diff --git a/httpx/dispatch/asgi.py b/httpx/dispatch/asgi.py index 23eebb0f..e3b4b0a0 100644 --- a/httpx/dispatch/asgi.py +++ b/httpx/dispatch/asgi.py @@ -78,7 +78,7 @@ class ASGIDispatch(AsyncDispatcher): app_exc = None status_code = None headers = None - response_started = asyncio.Event() + response_started_or_failed = asyncio.Event() response_body = BodyIterator() request_stream = request.stream() @@ -92,19 +92,20 @@ class ASGIDispatch(AsyncDispatcher): return {"type": "http.request", "body": body, "more_body": True} async def send(message: dict) -> None: - nonlocal status_code, headers, response_started, response_body, request + nonlocal status_code, headers, response_started_or_failed + nonlocal response_body, request if message["type"] == "http.response.start": status_code = message["status"] headers = message.get("headers", []) - response_started.set() + response_started_or_failed.set() elif message["type"] == "http.response.body": body = message.get("body", b"") more_body = message.get("more_body", False) if body and request.method != "HEAD": await response_body.put(body) if not more_body: - await response_body.done() + await response_body.mark_as_done() async def run_app() -> None: nonlocal app, scope, receive, send, app_exc, response_body @@ -113,7 +114,8 @@ class ASGIDispatch(AsyncDispatcher): except Exception as exc: app_exc = exc finally: - await response_body.done() + await response_body.mark_as_done() + response_started_or_failed.set() # Really we'd like to push all `asyncio` logic into concurrency.py, # with a standardized interface, so that we can support other event @@ -122,17 +124,13 @@ class ASGIDispatch(AsyncDispatcher): # `ConcurrencyBackend` with the `Client(app=asgi_app)` case. loop = asyncio.get_event_loop() app_task = loop.create_task(run_app()) - response_task = loop.create_task(response_started.wait()) - tasks = {app_task, response_task} # type: typing.Set[asyncio.Task] - - await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + await response_started_or_failed.wait() if app_exc is not None and self.raise_app_exceptions: raise app_exc - assert response_started.is_set(), "application did not return a response." - assert status_code is not None + assert status_code is not None, "application did not return a response." assert headers is not None async def on_close() -> None: @@ -189,7 +187,7 @@ class BodyIterator: """ await self._queue.put(data) - async def done(self) -> None: + async def mark_as_done(self) -> None: """ Used by the server to signal the end of the response body. """