return typing.cast(BaseEvent, asyncio.Event())
def background_manager(
- self, coroutine: typing.Callable, args: typing.Any
+ self, coroutine: typing.Callable, *args: typing.Any
) -> "BackgroundManager":
return BackgroundManager(coroutine, args)
raise NotImplementedError() # pragma: no cover
def background_manager(
- self, coroutine: typing.Callable, args: typing.Any
+ self, coroutine: typing.Callable, *args: typing.Any
) -> "BaseBackgroundManager":
raise NotImplementedError() # pragma: no cover
-import asyncio
import typing
from .base import AsyncDispatcher
await response_body.mark_as_done()
response_started_or_failed.set()
- # Really we'd like to push all `asyncio` logic into concurrency.py,
- # with a standardized interface, so that we can support other event
- # loop implementations, such as Trio and Curio.
- # That's a bit fiddly here, so we're not yet supporting using a custom
- # `ConcurrencyBackend` with the `Client(app=asgi_app)` case.
- loop = asyncio.get_event_loop()
- app_task = loop.create_task(run_app())
+ # Using the background manager here *works*, but it is weak design because
+ # the background task isn't strictly context-managed.
+ # We could consider refactoring the other uses of this abstraction
+ # (mainly sending/receiving request/response data in h11 and h2 dispatchers),
+ # and see if that allows us to come back here and refactor things out.
+ background = await self.backend.background_manager(run_app).__aenter__()
await response_started_or_failed.wait()
assert headers is not None
async def on_close() -> None:
- nonlocal app_task, response_body
+ nonlocal response_body
await response_body.drain()
- await app_task
+ await background.__aexit__(None, None, None)
if app_exc is not None and self.raise_app_exceptions:
raise app_exc
await self._send_request(request, timeout)
task, args = self._send_request_data, [request.stream(), timeout]
- async with self.backend.background_manager(task, args=args):
+ async with self.backend.background_manager(task, *args):
http_version, status_code, headers = await self._receive_response(timeout)
content = self._receive_response_data(timeout)
self.timeout_flags[stream_id] = TimeoutFlag()
task, args = self.send_request_data, [stream_id, request.stream(), timeout]
- async with self.backend.background_manager(task, args=args):
+ async with self.backend.background_manager(task, *args):
status_code, headers = await self.receive_response(stream_id, timeout)
content = self.body_iter(stream_id, timeout)
on_close = functools.partial(self.response_closed, stream_id=stream_id)