from ..interfaces import (
BaseBackgroundManager,
BasePoolSemaphore,
+ BaseQueue,
BaseReader,
BaseWriter,
ConcurrencyBackend,
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
return PoolSemaphore(limits)
+ def create_queue(self, max_size: int) -> BaseQueue:
+ return typing.cast(BaseQueue, asyncio.Queue(maxsize=max_size))
+
def background_manager(
self, coroutine: typing.Callable, args: typing.Any
) -> "BackgroundManager":
import asyncio
import typing
+from ..concurrency.asyncio import AsyncioBackend
from ..config import CertTypes, TimeoutTypes, VerifyTypes
-from ..interfaces import AsyncDispatcher
+from ..interfaces import AsyncDispatcher, ConcurrencyBackend
from ..models import AsyncRequest, AsyncResponse
self.raise_app_exceptions = raise_app_exceptions
self.root_path = root_path
self.client = client
+ # This will need to be turned into a parameter on this class at some point.
+ self.backend: ConcurrencyBackend = AsyncioBackend()
async def send(
self,
status_code = None
headers = None
response_started_or_failed = asyncio.Event()
- response_body = BodyIterator()
+ response_body = BodyIterator(self.backend)
request_stream = request.stream()
async def receive() -> dict:
ingest the response content from.
"""
- def __init__(self) -> None:
- self._queue = asyncio.Queue(
- maxsize=1
- ) # type: asyncio.Queue[typing.Union[bytes, object]]
+ def __init__(self, backend: ConcurrencyBackend) -> None:
+ self._queue = backend.create_queue(max_size=1)
self._done = object()
async def iterate(self) -> typing.AsyncIterator[bytes]:
raise NotImplementedError() # pragma: no cover
+class BaseQueue:
+ """
+ A FIFO queue. Abstracts away any asyncio-specific interfaces.
+ """
+
+ async def get(self) -> typing.Any:
+ raise NotImplementedError() # pragma: no cover
+
+ async def put(self, value: typing.Any) -> None:
+ raise NotImplementedError() # pragma: no cover
+
+
class BasePoolSemaphore:
"""
A semaphore for use with connection pooling.
except StopAsyncIteration:
break
+ def create_queue(self, max_size: int) -> BaseQueue:
+ raise NotImplementedError() # pragma: no cover
+
def background_manager(
self, coroutine: typing.Callable, args: typing.Any
) -> "BaseBackgroundManager":