]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Add BaseQueue interface (#257)
authorFlorimond Manca <florimond.manca@gmail.com>
Tue, 20 Aug 2019 22:25:46 +0000 (00:25 +0200)
committerGitHub <noreply@github.com>
Tue, 20 Aug 2019 22:25:46 +0000 (00:25 +0200)
httpx/concurrency/asyncio.py
httpx/dispatch/asgi.py
httpx/interfaces.py

index d8421f7185df964b823b6e327da6642141e2f3f9..08e39839ec9d6cef045968dea510a5add33686cf 100644 (file)
@@ -19,6 +19,7 @@ from ..exceptions import ConnectTimeout, PoolTimeout, ReadTimeout, WriteTimeout
 from ..interfaces import (
     BaseBackgroundManager,
     BasePoolSemaphore,
+    BaseQueue,
     BaseReader,
     BaseWriter,
     ConcurrencyBackend,
@@ -246,6 +247,9 @@ class AsyncioBackend(ConcurrencyBackend):
     def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
         return PoolSemaphore(limits)
 
+    def create_queue(self, max_size: int) -> BaseQueue:
+        return typing.cast(BaseQueue, asyncio.Queue(maxsize=max_size))
+
     def background_manager(
         self, coroutine: typing.Callable, args: typing.Any
     ) -> "BackgroundManager":
index 01313fedf4a2116537c7ca525093c1342fc04b8c..5b6bcad6d493e31270846bbe5dd2fb40bc5ac81b 100644 (file)
@@ -1,8 +1,9 @@
 import asyncio
 import typing
 
+from ..concurrency.asyncio import AsyncioBackend
 from ..config import CertTypes, TimeoutTypes, VerifyTypes
-from ..interfaces import AsyncDispatcher
+from ..interfaces import AsyncDispatcher, ConcurrencyBackend
 from ..models import AsyncRequest, AsyncResponse
 
 
@@ -52,6 +53,8 @@ class ASGIDispatch(AsyncDispatcher):
         self.raise_app_exceptions = raise_app_exceptions
         self.root_path = root_path
         self.client = client
+        # This will need to be turned into a parameter on this class at some point.
+        self.backend: ConcurrencyBackend = AsyncioBackend()
 
     async def send(
         self,
@@ -79,7 +82,7 @@ class ASGIDispatch(AsyncDispatcher):
         status_code = None
         headers = None
         response_started_or_failed = asyncio.Event()
-        response_body = BodyIterator()
+        response_body = BodyIterator(self.backend)
         request_stream = request.stream()
 
         async def receive() -> dict:
@@ -156,10 +159,8 @@ class BodyIterator:
     ingest the response content from.
     """
 
-    def __init__(self) -> None:
-        self._queue = asyncio.Queue(
-            maxsize=1
-        )  # type: asyncio.Queue[typing.Union[bytes, object]]
+    def __init__(self, backend: ConcurrencyBackend) -> None:
+        self._queue = backend.create_queue(max_size=1)
         self._done = object()
 
     async def iterate(self) -> typing.AsyncIterator[bytes]:
index 6a39175f3e4de08bc3ac3f5a1eb988eb3620ab5c..cdfc0726f3d5de79b9955189da541487e60b68c4 100644 (file)
@@ -145,6 +145,18 @@ class BaseWriter:
         raise NotImplementedError()  # pragma: no cover
 
 
+class BaseQueue:
+    """
+    A FIFO queue. Abstracts away any asyncio-specific interfaces.
+    """
+
+    async def get(self) -> typing.Any:
+        raise NotImplementedError()  # pragma: no cover
+
+    async def put(self, value: typing.Any) -> None:
+        raise NotImplementedError()  # pragma: no cover
+
+
 class BasePoolSemaphore:
     """
     A semaphore for use with connection pooling.
@@ -205,6 +217,9 @@ class ConcurrencyBackend:
             except StopAsyncIteration:
                 break
 
+    def create_queue(self, max_size: int) -> BaseQueue:
+        raise NotImplementedError()  # pragma: no cover
+
     def background_manager(
         self, coroutine: typing.Callable, args: typing.Any
     ) -> "BaseBackgroundManager":