from .base import (
BaseBackgroundManager,
BasePoolSemaphore,
+ BaseEvent,
BaseQueue,
BaseReader,
BaseWriter,
def create_queue(self, max_size: int) -> BaseQueue:
return typing.cast(BaseQueue, asyncio.Queue(maxsize=max_size))
+ def create_event(self) -> BaseEvent:
+ return typing.cast(BaseEvent, asyncio.Event())
+
def background_manager(
self, coroutine: typing.Callable, args: typing.Any
) -> "BackgroundManager":
raise NotImplementedError() # pragma: no cover
+class BaseEvent:
+ """
+ An event object. Abstracts away any asyncio-specific interfaces.
+ """
+
+ def set(self) -> None:
+ raise NotImplementedError() # pragma: no cover
+
+ def is_set(self) -> bool:
+ raise NotImplementedError() # pragma: no cover
+
+ async def wait(self) -> None:
+ raise NotImplementedError() # pragma: no cover
+
+
class BasePoolSemaphore:
"""
A semaphore for use with connection pooling.
def create_queue(self, max_size: int) -> BaseQueue:
raise NotImplementedError() # pragma: no cover
+ def create_event(self) -> BaseEvent:
+ raise NotImplementedError() # pragma: no cover
+
def background_manager(
self, coroutine: typing.Callable, args: typing.Any
) -> "BaseBackgroundManager":
app_exc = None
status_code = None
headers = None
- response_started_or_failed = asyncio.Event()
+ response_started_or_failed = self.backend.create_event()
response_body = BodyIterator(self.backend)
request_stream = request.stream()