CertTypes = typing.Union[str, typing.Tuple[str, str], typing.Tuple[str, str, str]]
VerifyTypes = typing.Union[str, bool, ssl.SSLContext]
-TimeoutTypes = typing.Union[float, typing.Tuple[float, float, float], "TimeoutConfig"]
+TimeoutTypes = typing.Union[
+ float, typing.Tuple[float, float, float, float], "TimeoutConfig"
+]
HTTPVersionTypes = typing.Union[
str, typing.List[str], typing.Tuple[str], "HTTPVersionConfig"
]
connect_timeout: float = None,
read_timeout: float = None,
write_timeout: float = None,
+ pool_timeout: float = None,
):
if timeout is None:
self.connect_timeout = connect_timeout
self.read_timeout = read_timeout
self.write_timeout = write_timeout
+ self.pool_timeout = pool_timeout
else:
# Specified as a single timeout value
assert connect_timeout is None
assert read_timeout is None
assert write_timeout is None
+ assert pool_timeout is None
if isinstance(timeout, TimeoutConfig):
self.connect_timeout = timeout.connect_timeout
self.read_timeout = timeout.read_timeout
self.write_timeout = timeout.write_timeout
+ self.pool_timeout = timeout.pool_timeout
elif isinstance(timeout, tuple):
self.connect_timeout = timeout[0]
self.read_timeout = timeout[1]
- self.write_timeout = timeout[2]
+ self.write_timeout = None if len(timeout) < 3 else timeout[2]
+ self.pool_timeout = None if len(timeout) < 4 else timeout[3]
else:
self.connect_timeout = timeout
self.read_timeout = timeout
self.write_timeout = timeout
+ self.pool_timeout = timeout
def __eq__(self, other: typing.Any) -> bool:
return (
and self.connect_timeout == other.connect_timeout
and self.read_timeout == other.read_timeout
and self.write_timeout == other.write_timeout
+ and self.pool_timeout == other.pool_timeout
)
def __repr__(self) -> str:
class_name = self.__class__.__name__
- if len({self.connect_timeout, self.read_timeout, self.write_timeout}) == 1:
+ if (
+ len(
+ {
+ self.connect_timeout,
+ self.read_timeout,
+ self.write_timeout,
+ self.pool_timeout,
+ }
+ )
+ == 1
+ ):
return f"{class_name}(timeout={self.connect_timeout})"
return (
f"{class_name}(connect_timeout={self.connect_timeout}, "
- f"read_timeout={self.read_timeout}, write_timeout={self.write_timeout})"
+ f"read_timeout={self.read_timeout}, write_timeout={self.write_timeout}, "
+ f"pool_timeout={self.pool_timeout})"
)
"""
def __init__(
- self,
- *,
- soft_limit: int = None,
- hard_limit: int = None,
- pool_timeout: float = None,
+ self, *, soft_limit: int = None, hard_limit: int = None,
):
self.soft_limit = soft_limit
self.hard_limit = hard_limit
- self.pool_timeout = pool_timeout
def __eq__(self, other: typing.Any) -> bool:
return (
isinstance(other, self.__class__)
and self.soft_limit == other.soft_limit
and self.hard_limit == other.hard_limit
- and self.pool_timeout == other.pool_timeout
)
def __repr__(self) -> str:
class_name = self.__class__.__name__
return (
- f"{class_name}(soft_limit={self.soft_limit}, "
- f"hard_limit={self.hard_limit}, pool_timeout={self.pool_timeout})"
+ f"{class_name}(soft_limit={self.soft_limit}, hard_limit={self.hard_limit})"
)
DEFAULT_SSL_CONFIG = SSLConfig(cert=None, verify=True)
DEFAULT_TIMEOUT_CONFIG = TimeoutConfig(timeout=5.0)
-DEFAULT_POOL_LIMITS = PoolLimits(soft_limit=10, hard_limit=100, pool_timeout=5.0)
+DEFAULT_POOL_LIMITS = PoolLimits(soft_limit=10, hard_limit=100)
DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
DEFAULT_MAX_REDIRECTS = 20
CertTypes,
HTTPVersionTypes,
PoolLimits,
+ TimeoutConfig,
TimeoutTypes,
VerifyTypes,
)
):
self.verify = verify
self.cert = cert
- self.timeout = timeout
+ self.timeout = TimeoutConfig(timeout)
self.pool_limits = pool_limits
self.http_versions = http_versions
self.is_closed = False
cert: CertTypes = None,
timeout: TimeoutTypes = None,
) -> Response:
- connection = await self.acquire_connection(origin=request.url.origin)
+ connection = await self.acquire_connection(
+ origin=request.url.origin, timeout=timeout
+ )
try:
response = await connection.send(
request, verify=verify, cert=cert, timeout=timeout
return response
- async def acquire_connection(self, origin: Origin) -> HTTPConnection:
+ async def acquire_connection(
+ self, origin: Origin, timeout: TimeoutTypes = None
+ ) -> HTTPConnection:
logger.trace(f"acquire_connection origin={origin!r}")
connection = self.pop_connection(origin)
if connection is None:
- await self.max_connections.acquire()
+ if timeout is None:
+ pool_timeout = self.timeout.pool_timeout
+ else:
+ pool_timeout = TimeoutConfig(timeout).pool_timeout
+
+ await self.max_connections.acquire(timeout=pool_timeout)
connection = HTTPConnection(
origin,
verify=self.verify,
def test_limits_repr():
limits = httpx.PoolLimits(hard_limit=100)
- assert (
- repr(limits) == "PoolLimits(soft_limit=None, hard_limit=100, pool_timeout=None)"
- )
+ assert repr(limits) == "PoolLimits(soft_limit=None, hard_limit=100)"
def test_ssl_eq():
assert timeout.connect_timeout is None
assert timeout.read_timeout is None
assert timeout.write_timeout is None
+ assert timeout.pool_timeout is None
def test_timeout_from_none():
def test_timeout_from_tuple():
- timeout = httpx.TimeoutConfig(timeout=(5.0, 5.0, 5.0))
+ timeout = httpx.TimeoutConfig(timeout=(5.0, 5.0, 5.0, 5.0))
assert timeout == httpx.TimeoutConfig(timeout=5.0)
assert repr(timeout) == "TimeoutConfig(timeout=5.0)"
timeout = httpx.TimeoutConfig(read_timeout=5.0)
- assert (
- repr(timeout)
- == "TimeoutConfig(connect_timeout=None, read_timeout=5.0, write_timeout=None)"
+ assert repr(timeout) == (
+ "TimeoutConfig(connect_timeout=None, read_timeout=5.0, "
+ "write_timeout=None, pool_timeout=None)"
)