From: Tom Christie Date: Fri, 29 Nov 2019 09:07:53 +0000 (+0000) Subject: Simplify ASGI dispatch (#560) X-Git-Tag: 0.9.0~45 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=44ad295572f7569f6b02c9868ee22fda594c21a4;p=thirdparty%2Fhttpx.git Simplify ASGI dispatch (#560) * Simplify ASGI dispatch * Blackify * Linting --- diff --git a/httpx/dispatch/asgi.py b/httpx/dispatch/asgi.py index ab640b86..f9cbb35f 100644 --- a/httpx/dispatch/asgi.py +++ b/httpx/dispatch/asgi.py @@ -82,12 +82,12 @@ class ASGIDispatch(Dispatcher): "root_path": self.root_path, } app = MessageLoggerASGIMiddleware(self.app, logger=logger) - app_exc = None status_code = None headers = None - response_started_or_failed = self.backend.create_event() - response_body = BodyIterator(self.backend) + body_parts = [] request_stream = request.stream() + response_started = False + response_complete = False async def receive() -> dict: try: @@ -97,102 +97,41 @@ class ASGIDispatch(Dispatcher): return {"type": "http.request", "body": body, "more_body": True} async def send(message: dict) -> None: - nonlocal status_code, headers + nonlocal status_code, headers, body_parts + nonlocal response_started, response_complete if message["type"] == "http.response.start": + assert not response_started + status_code = message["status"] headers = message.get("headers", []) - response_started_or_failed.set() + response_started = True elif message["type"] == "http.response.body": + assert not response_complete body = message.get("body", b"") more_body = message.get("more_body", False) if body and request.method != "HEAD": - await response_body.put(body) + body_parts.append(body) if not more_body: - await response_body.mark_as_done() + response_complete = True - async def run_app() -> None: - nonlocal app_exc - try: - await app(scope, receive, send) - except Exception as exc: - app_exc = exc - finally: - await response_body.mark_as_done() - response_started_or_failed.set() - - # Using the background manager here *works*, but it is weak design because - # the background task isn't strictly context-managed. - # We could consider refactoring the other uses of this abstraction - # (mainly sending/receiving request/response data in h11 and h2 dispatchers), - # and see if that allows us to come back here and refactor things out. - background = await self.backend.background_manager(run_app).__aenter__() - - await response_started_or_failed.wait() - - if app_exc is not None and self.raise_app_exceptions: - await background.close(app_exc) - raise app_exc - - assert status_code is not None, "application did not return a response." - assert headers is not None + try: + await app(scope, receive, send) + except Exception: + if self.raise_app_exceptions or not response_complete: + raise - async def on_close() -> None: - await response_body.drain() - await background.close(app_exc) - if app_exc is not None and self.raise_app_exceptions: - raise app_exc + assert response_complete + assert status_code is not None + assert headers is not None return Response( status_code=status_code, http_version="HTTP/1.1", headers=headers, - content=response_body.iterate(), - on_close=on_close, + content=b"".join(body_parts), request=request, ) - - -class BodyIterator: - """ - Provides a byte-iterator interface that the client can use to - ingest the response content from. - """ - - def __init__(self, backend: ConcurrencyBackend) -> None: - self._queue = backend.create_queue(max_size=1) - self._done = object() - - async def iterate(self) -> typing.AsyncIterator[bytes]: - """ - A byte-iterator, used by the client to consume the response body. - """ - while True: - data = await self._queue.get() - if data is self._done: - break - assert isinstance(data, bytes) - yield data - - async def drain(self) -> None: - """ - Drain any remaining body, in order to allow any blocked `put()` calls - to complete. - """ - async for chunk in self.iterate(): - pass # pragma: no cover - - async def put(self, data: bytes) -> None: - """ - Used by the server to add data to the response body. - """ - await self._queue.put(data) - - async def mark_as_done(self) -> None: - """ - Used by the server to signal the end of the response body. - """ - await self._queue.put(self._done)