from .config import PoolLimits, SSLConfig, TimeoutConfig
from .datastructures import URL, Request, Response
-from .exceptions import ResponseClosed, StreamConsumed
+from .exceptions import (
+ ConnectTimeout,
+ PoolTimeout,
+ ReadTimeout,
+ ResponseClosed,
+ StreamConsumed,
+ Timeout,
+)
from .pool import ConnectionPool
__version__ = "0.0.3"
try:
import brotli
-except ImportError:
- brotli = None # pragma: nocover
+except ImportError: # pragma: nocover
+ brotli = None
)
from .connections import Connection
from .datastructures import URL, Request, Response
+from .exceptions import PoolTimeout
ConnectionKey = typing.Tuple[str, str, int] # (scheme, host, port)
class ConnectionSemaphore:
- def __init__(self, max_connections: int = None):
+ def __init__(self, max_connections: int = None, timeout: float = None):
+ self.timeout = timeout
if max_connections is not None:
self.semaphore = asyncio.BoundedSemaphore(value=max_connections)
async def acquire(self) -> None:
if hasattr(self, "semaphore"):
- await self.semaphore.acquire()
+ try:
+ await asyncio.wait_for(self.semaphore.acquire(), self.timeout)
+ except asyncio.TimeoutError:
+ raise PoolTimeout()
def release(self) -> None:
if hasattr(self, "semaphore"):
{}
) # type: typing.Dict[ConnectionKey, typing.List[Connection]]
self._connection_semaphore = ConnectionSemaphore(
- max_connections=self.limits.hard_limit
+ max_connections=self.limits.hard_limit, timeout=self.timeout.pool_timeout
)
async def request(
async def app(scope, receive, send):
assert scope["type"] == "http"
+ if scope["path"] == "/slow_response":
+ await slow_response(scope, receive, send)
+ else:
+ await hello_world(scope, receive, send)
+
+
+async def hello_world(scope, receive, send):
+ await send(
+ {
+ "type": "http.response.start",
+ "status": 200,
+ "headers": [[b"content-type", b"text/plain"]],
+ }
+ )
+ await send({"type": "http.response.body", "body": b"Hello, world!"})
+
+
+async def slow_response(scope, receive, send):
+ await asyncio.sleep(0.01)
await send(
{
"type": "http.response.start",
yield server
finally:
server.should_exit = True
- server.force_exit = True
await task
--- /dev/null
+import pytest
+
+import httpcore
+
+
+@pytest.mark.asyncio
+async def test_read_timeout(server):
+ timeout = httpcore.TimeoutConfig(read_timeout=0.0001)
+
+ async with httpcore.ConnectionPool(timeout=timeout) as http:
+ with pytest.raises(httpcore.ReadTimeout):
+ await http.request("GET", "http://127.0.0.1:8000/slow_response")
+
+
+@pytest.mark.asyncio
+async def test_pool_timeout(server):
+ timeout = httpcore.TimeoutConfig(pool_timeout=0.0001)
+ limits = httpcore.PoolLimits(hard_limit=1)
+
+ async with httpcore.ConnectionPool(timeout=timeout, limits=limits) as http:
+ response = await http.request("GET", "http://127.0.0.1:8000/", stream=True)
+
+ with pytest.raises(httpcore.PoolTimeout):
+ await http.request("GET", "http://localhost:8000/")
+
+ await response.read()