"root_path": self.root_path,
}
app = MessageLoggerASGIMiddleware(self.app, logger=logger)
- app_exc = None
status_code = None
headers = None
- response_started_or_failed = self.backend.create_event()
- response_body = BodyIterator(self.backend)
+ body_parts = []
request_stream = request.stream()
+ response_started = False
+ response_complete = False
async def receive() -> dict:
try:
return {"type": "http.request", "body": body, "more_body": True}
async def send(message: dict) -> None:
- nonlocal status_code, headers
+ nonlocal status_code, headers, body_parts
+ nonlocal response_started, response_complete
if message["type"] == "http.response.start":
+ assert not response_started
+
status_code = message["status"]
headers = message.get("headers", [])
- response_started_or_failed.set()
+ response_started = True
elif message["type"] == "http.response.body":
+ assert not response_complete
body = message.get("body", b"")
more_body = message.get("more_body", False)
if body and request.method != "HEAD":
- await response_body.put(body)
+ body_parts.append(body)
if not more_body:
- await response_body.mark_as_done()
+ response_complete = True
- async def run_app() -> None:
- nonlocal app_exc
- try:
- await app(scope, receive, send)
- except Exception as exc:
- app_exc = exc
- finally:
- await response_body.mark_as_done()
- response_started_or_failed.set()
-
- # Using the background manager here *works*, but it is weak design because
- # the background task isn't strictly context-managed.
- # We could consider refactoring the other uses of this abstraction
- # (mainly sending/receiving request/response data in h11 and h2 dispatchers),
- # and see if that allows us to come back here and refactor things out.
- background = await self.backend.background_manager(run_app).__aenter__()
-
- await response_started_or_failed.wait()
-
- if app_exc is not None and self.raise_app_exceptions:
- await background.close(app_exc)
- raise app_exc
-
- assert status_code is not None, "application did not return a response."
- assert headers is not None
+ try:
+ await app(scope, receive, send)
+ except Exception:
+ if self.raise_app_exceptions or not response_complete:
+ raise
- async def on_close() -> None:
- await response_body.drain()
- await background.close(app_exc)
- if app_exc is not None and self.raise_app_exceptions:
- raise app_exc
+ assert response_complete
+ assert status_code is not None
+ assert headers is not None
return Response(
status_code=status_code,
http_version="HTTP/1.1",
headers=headers,
- content=response_body.iterate(),
- on_close=on_close,
+ content=b"".join(body_parts),
request=request,
)
-
-
-class BodyIterator:
- """
- Provides a byte-iterator interface that the client can use to
- ingest the response content from.
- """
-
- def __init__(self, backend: ConcurrencyBackend) -> None:
- self._queue = backend.create_queue(max_size=1)
- self._done = object()
-
- async def iterate(self) -> typing.AsyncIterator[bytes]:
- """
- A byte-iterator, used by the client to consume the response body.
- """
- while True:
- data = await self._queue.get()
- if data is self._done:
- break
- assert isinstance(data, bytes)
- yield data
-
- async def drain(self) -> None:
- """
- Drain any remaining body, in order to allow any blocked `put()` calls
- to complete.
- """
- async for chunk in self.iterate():
- pass # pragma: no cover
-
- async def put(self, data: bytes) -> None:
- """
- Used by the server to add data to the response body.
- """
- await self._queue.put(data)
-
- async def mark_as_done(self) -> None:
- """
- Used by the server to signal the end of the response body.
- """
- await self._queue.put(self._done)