]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Simplify ASGI dispatch (#560)
authorTom Christie <tom@tomchristie.com>
Fri, 29 Nov 2019 09:07:53 +0000 (09:07 +0000)
committerGitHub <noreply@github.com>
Fri, 29 Nov 2019 09:07:53 +0000 (09:07 +0000)
* Simplify ASGI dispatch

* Blackify

* Linting

httpx/dispatch/asgi.py

index ab640b86e037e16617cbb6761298a4c8aa7c5258..f9cbb35ffd716c5e4209fde44b701c94c6823d7f 100644 (file)
@@ -82,12 +82,12 @@ class ASGIDispatch(Dispatcher):
             "root_path": self.root_path,
         }
         app = MessageLoggerASGIMiddleware(self.app, logger=logger)
-        app_exc = None
         status_code = None
         headers = None
-        response_started_or_failed = self.backend.create_event()
-        response_body = BodyIterator(self.backend)
+        body_parts = []
         request_stream = request.stream()
+        response_started = False
+        response_complete = False
 
         async def receive() -> dict:
             try:
@@ -97,102 +97,41 @@ class ASGIDispatch(Dispatcher):
             return {"type": "http.request", "body": body, "more_body": True}
 
         async def send(message: dict) -> None:
-            nonlocal status_code, headers
+            nonlocal status_code, headers, body_parts
+            nonlocal response_started, response_complete
 
             if message["type"] == "http.response.start":
+                assert not response_started
+
                 status_code = message["status"]
                 headers = message.get("headers", [])
-                response_started_or_failed.set()
+                response_started = True
 
             elif message["type"] == "http.response.body":
+                assert not response_complete
                 body = message.get("body", b"")
                 more_body = message.get("more_body", False)
 
                 if body and request.method != "HEAD":
-                    await response_body.put(body)
+                    body_parts.append(body)
 
                 if not more_body:
-                    await response_body.mark_as_done()
+                    response_complete = True
 
-        async def run_app() -> None:
-            nonlocal app_exc
-            try:
-                await app(scope, receive, send)
-            except Exception as exc:
-                app_exc = exc
-            finally:
-                await response_body.mark_as_done()
-                response_started_or_failed.set()
-
-        # Using the background manager here *works*, but it is weak design because
-        # the background task isn't strictly context-managed.
-        # We could consider refactoring the other uses of this abstraction
-        # (mainly sending/receiving request/response data in h11 and h2 dispatchers),
-        # and see if that allows us to come back here and refactor things out.
-        background = await self.backend.background_manager(run_app).__aenter__()
-
-        await response_started_or_failed.wait()
-
-        if app_exc is not None and self.raise_app_exceptions:
-            await background.close(app_exc)
-            raise app_exc
-
-        assert status_code is not None, "application did not return a response."
-        assert headers is not None
+        try:
+            await app(scope, receive, send)
+        except Exception:
+            if self.raise_app_exceptions or not response_complete:
+                raise
 
-        async def on_close() -> None:
-            await response_body.drain()
-            await background.close(app_exc)
-            if app_exc is not None and self.raise_app_exceptions:
-                raise app_exc
+        assert response_complete
+        assert status_code is not None
+        assert headers is not None
 
         return Response(
             status_code=status_code,
             http_version="HTTP/1.1",
             headers=headers,
-            content=response_body.iterate(),
-            on_close=on_close,
+            content=b"".join(body_parts),
             request=request,
         )
-
-
-class BodyIterator:
-    """
-    Provides a byte-iterator interface that the client can use to
-    ingest the response content from.
-    """
-
-    def __init__(self, backend: ConcurrencyBackend) -> None:
-        self._queue = backend.create_queue(max_size=1)
-        self._done = object()
-
-    async def iterate(self) -> typing.AsyncIterator[bytes]:
-        """
-        A byte-iterator, used by the client to consume the response body.
-        """
-        while True:
-            data = await self._queue.get()
-            if data is self._done:
-                break
-            assert isinstance(data, bytes)
-            yield data
-
-    async def drain(self) -> None:
-        """
-        Drain any remaining body, in order to allow any blocked `put()` calls
-        to complete.
-        """
-        async for chunk in self.iterate():
-            pass  # pragma: no cover
-
-    async def put(self, data: bytes) -> None:
-        """
-        Used by the server to add data to the response body.
-        """
-        await self._queue.put(data)
-
-    async def mark_as_done(self) -> None:
-        """
-        Used by the server to signal the end of the response body.
-        """
-        await self._queue.put(self._done)