]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Simplify ASGI concurrency (#248)
authorTom Christie <tom@tomchristie.com>
Tue, 20 Aug 2019 10:37:37 +0000 (11:37 +0100)
committerGitHub <noreply@github.com>
Tue, 20 Aug 2019 10:37:37 +0000 (11:37 +0100)
* Drop  keyword argument

* Improve docstrings for WSGIDispatch and ASGIDispatch

* Add docs for fine grained WSGI/ASGI control

* Simplify concurrency handling in ASGIDispatch

* Variable renaming

* Fix missing nonlocal declaration

* Split nonlocal onto multiple lines

httpx/dispatch/asgi.py

index 23eebb0fc94da21dbdcc0074875501feaa78a381..e3b4b0a0dfb4f9973f35891ba881699fc3f2a93f 100644 (file)
@@ -78,7 +78,7 @@ class ASGIDispatch(AsyncDispatcher):
         app_exc = None
         status_code = None
         headers = None
-        response_started = asyncio.Event()
+        response_started_or_failed = asyncio.Event()
         response_body = BodyIterator()
         request_stream = request.stream()
 
@@ -92,19 +92,20 @@ class ASGIDispatch(AsyncDispatcher):
             return {"type": "http.request", "body": body, "more_body": True}
 
         async def send(message: dict) -> None:
-            nonlocal status_code, headers, response_started, response_body, request
+            nonlocal status_code, headers, response_started_or_failed
+            nonlocal response_body, request
 
             if message["type"] == "http.response.start":
                 status_code = message["status"]
                 headers = message.get("headers", [])
-                response_started.set()
+                response_started_or_failed.set()
             elif message["type"] == "http.response.body":
                 body = message.get("body", b"")
                 more_body = message.get("more_body", False)
                 if body and request.method != "HEAD":
                     await response_body.put(body)
                 if not more_body:
-                    await response_body.done()
+                    await response_body.mark_as_done()
 
         async def run_app() -> None:
             nonlocal app, scope, receive, send, app_exc, response_body
@@ -113,7 +114,8 @@ class ASGIDispatch(AsyncDispatcher):
             except Exception as exc:
                 app_exc = exc
             finally:
-                await response_body.done()
+                await response_body.mark_as_done()
+                response_started_or_failed.set()
 
         # Really we'd like to push all `asyncio` logic into concurrency.py,
         # with a standardized interface, so that we can support other event
@@ -122,17 +124,13 @@ class ASGIDispatch(AsyncDispatcher):
         # `ConcurrencyBackend` with the `Client(app=asgi_app)` case.
         loop = asyncio.get_event_loop()
         app_task = loop.create_task(run_app())
-        response_task = loop.create_task(response_started.wait())
 
-        tasks = {app_task, response_task}  # type: typing.Set[asyncio.Task]
-
-        await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+        await response_started_or_failed.wait()
 
         if app_exc is not None and self.raise_app_exceptions:
             raise app_exc
 
-        assert response_started.is_set(), "application did not return a response."
-        assert status_code is not None
+        assert status_code is not None, "application did not return a response."
         assert headers is not None
 
         async def on_close() -> None:
@@ -189,7 +187,7 @@ class BodyIterator:
         """
         await self._queue.put(data)
 
-    async def done(self) -> None:
+    async def mark_as_done(self) -> None:
         """
         Used by the server to signal the end of the response body.
         """