Besides, you can pass timeouts in two forms:
- A number, which sets the read, write and connect timeouts to the same value, as in the examples above.
-- A `TimeoutConfig` instance, which allows to define the read, write and connect timeouts independently:
+- A `Timeout` instance, which allows to define the read, write and connect timeouts independently:
```python
-timeout = httpx.TimeoutConfig(
+timeout = httpx.Timeout(
connect_timeout=5,
read_timeout=10,
write_timeout=15
await client.get(url) # Does not timeout, returns after 10s
-timeout = httpx.TimeoutConfig(
+timeout = httpx.Timeout(
connect_timeout=5,
read_timeout=None,
write_timeout=5
TRACE [2019-11-06 19:18:56] httpx.dispatch.connection_pool - new_connection connection=HTTPConnection(origin=Origin(scheme='https' host='google.com' port=443))
TRACE [2019-11-06 19:18:56] httpx.config - load_ssl_context verify=True cert=None trust_env=True http_versions=HTTPVersionConfig(['HTTP/1.1', 'HTTP/2'])
TRACE [2019-11-06 19:18:56] httpx.config - load_verify_locations cafile=/Users/florimond/Developer/python-projects/httpx/venv/lib/python3.8/site-packages/certifi/cacert.pem
-TRACE [2019-11-06 19:18:56] httpx.dispatch.connection - start_connect host='google.com' port=443 timeout=TimeoutConfig(timeout=5.0)
+TRACE [2019-11-06 19:18:56] httpx.dispatch.connection - start_connect host='google.com' port=443 timeout=Timeout(timeout=5.0)
TRACE [2019-11-06 19:18:56] httpx.dispatch.connection - connected http_version='HTTP/2'
TRACE [2019-11-06 19:18:56] httpx.dispatch.http2 - send_headers stream_id=1 method='GET' target='/' headers=[(b':method', b'GET'), (b':authority', b'google.com'), (b':scheme', b'https'), (b':path', b'/'), (b'user-agent', b'python-httpx/0.7.6'), (b'accept', b'*/*'), (b'accept-encoding', b'gzip, deflate, br'), (b'connection', b'keep-alive')]
TRACE [2019-11-06 19:18:56] httpx.dispatch.http2 - end_stream stream_id=1
TRACE [2019-11-06 19:18:56] httpx.dispatch.connection_pool - new_connection connection=HTTPConnection(origin=Origin(scheme='https' host='www.google.com' port=443))
TRACE [2019-11-06 19:18:56] httpx.config - load_ssl_context verify=True cert=None trust_env=True http_versions=HTTPVersionConfig(['HTTP/1.1', 'HTTP/2'])
TRACE [2019-11-06 19:18:56] httpx.config - load_verify_locations cafile=/Users/florimond/Developer/python-projects/httpx/venv/lib/python3.8/site-packages/certifi/cacert.pem
-TRACE [2019-11-06 19:18:56] httpx.dispatch.connection - start_connect host='www.google.com' port=443 timeout=TimeoutConfig(timeout=5.0)
+TRACE [2019-11-06 19:18:56] httpx.dispatch.connection - start_connect host='www.google.com' port=443 timeout=Timeout(timeout=5.0)
TRACE [2019-11-06 19:18:56] httpx.dispatch.connection - connected http_version='HTTP/2'
TRACE [2019-11-06 19:18:56] httpx.dispatch.http2 - send_headers stream_id=1 method='GET' target='/' headers=[(b':method', b'GET'), (b':authority', b'www.google.com'), (b':scheme', b'https'), (b':path', b'/'), (b'user-agent', b'python-httpx/0.7.6'), (b'accept', b'*/*'), (b'accept-encoding', b'gzip, deflate, br'), (b'connection', b'keep-alive')]
TRACE [2019-11-06 19:18:56] httpx.dispatch.http2 - end_stream stream_id=1
CertTypes,
PoolLimits,
SSLConfig,
+ Timeout,
TimeoutConfig,
TimeoutTypes,
VerifyTypes,
ReadTimeout,
RedirectBodyUnavailable,
RedirectLoop,
+ RequestTimeout,
ResponseClosed,
ResponseNotRead,
StreamConsumed,
- Timeout,
TooManyRedirects,
WriteTimeout,
)
"CertTypes",
"PoolLimits",
"SSLConfig",
+ "Timeout",
"TimeoutConfig",
"VerifyTypes",
"HTTPConnection",
"ResponseNotRead",
"StreamConsumed",
"ProxyError",
- "Timeout",
"TooManyRedirects",
"WriteTimeout",
"BaseSocketStream",
"QueryParamTypes",
"Request",
"RequestData",
+ "RequestTimeout",
"Response",
"ResponseContent",
"RequestFiles",
import typing
from types import TracebackType
-from ..config import PoolLimits, TimeoutConfig
+from ..config import PoolLimits, Timeout
from ..exceptions import ConnectTimeout, PoolTimeout, ReadTimeout, WriteTimeout
from .base import (
BaseBackgroundManager,
self,
stream_reader: asyncio.StreamReader,
stream_writer: asyncio.StreamWriter,
- timeout: TimeoutConfig,
+ timeout: Timeout,
):
self.stream_reader = stream_reader
self.stream_writer = stream_writer
self._inner: typing.Optional[SocketStream] = None
async def start_tls(
- self, hostname: str, ssl_context: ssl.SSLContext, timeout: TimeoutConfig
+ self, hostname: str, ssl_context: ssl.SSLContext, timeout: Timeout
) -> "SocketStream":
loop = asyncio.get_event_loop()
return "HTTP/2" if ident == "h2" else "HTTP/1.1"
async def read(
- self, n: int, timeout: TimeoutConfig = None, flag: TimeoutFlag = None
+ self, n: int, timeout: Timeout = None, flag: TimeoutFlag = None
) -> bytes:
if timeout is None:
timeout = self.timeout
self.stream_writer.write(data) # pragma: nocover
async def write(
- self, data: bytes, timeout: TimeoutConfig = None, flag: TimeoutFlag = None
+ self, data: bytes, timeout: Timeout = None, flag: TimeoutFlag = None
) -> None:
if not data:
return
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> SocketStream:
try:
stream_reader, stream_writer = await asyncio.wait_for( # type: ignore
path: str,
hostname: typing.Optional[str],
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> SocketStream:
server_hostname = hostname if ssl_context else None
import sniffio
-from ..config import PoolLimits, TimeoutConfig
+from ..config import PoolLimits, Timeout
from .base import (
BaseBackgroundManager,
BaseEvent,
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> BaseSocketStream:
return await self.backend.open_tcp_stream(hostname, port, ssl_context, timeout)
path: str,
hostname: typing.Optional[str],
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> BaseSocketStream:
return await self.backend.open_uds_stream(path, hostname, ssl_context, timeout)
import typing
from types import TracebackType
-from ..config import PoolLimits, TimeoutConfig
+from ..config import PoolLimits, Timeout
def lookup_backend(
raise NotImplementedError() # pragma: no cover
async def start_tls(
- self, hostname: str, ssl_context: ssl.SSLContext, timeout: TimeoutConfig
+ self, hostname: str, ssl_context: ssl.SSLContext, timeout: Timeout
) -> "BaseSocketStream":
raise NotImplementedError() # pragma: no cover
async def read(
- self, n: int, timeout: TimeoutConfig = None, flag: typing.Any = None
+ self, n: int, timeout: Timeout = None, flag: typing.Any = None
) -> bytes:
raise NotImplementedError() # pragma: no cover
def write_no_block(self, data: bytes) -> None:
raise NotImplementedError() # pragma: no cover
- async def write(self, data: bytes, timeout: TimeoutConfig = None) -> None:
+ async def write(self, data: bytes, timeout: Timeout = None) -> None:
raise NotImplementedError() # pragma: no cover
async def close(self) -> None:
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> BaseSocketStream:
raise NotImplementedError() # pragma: no cover
path: str,
hostname: typing.Optional[str],
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> BaseSocketStream:
raise NotImplementedError() # pragma: no cover
import trio
-from ..config import PoolLimits, TimeoutConfig
+from ..config import PoolLimits, Timeout
from ..exceptions import ConnectTimeout, PoolTimeout, ReadTimeout, WriteTimeout
from .base import (
BaseBackgroundManager,
class SocketStream(BaseSocketStream):
def __init__(
- self,
- stream: typing.Union[trio.SocketStream, trio.SSLStream],
- timeout: TimeoutConfig,
+ self, stream: typing.Union[trio.SocketStream, trio.SSLStream], timeout: Timeout,
) -> None:
self.stream = stream
self.timeout = timeout
self.write_lock = trio.Lock()
async def start_tls(
- self, hostname: str, ssl_context: ssl.SSLContext, timeout: TimeoutConfig
+ self, hostname: str, ssl_context: ssl.SSLContext, timeout: Timeout
) -> "SocketStream":
# Check that the write buffer is empty. We should never start a TLS stream
# while there is still pending data to write.
return "HTTP/2" if ident == "h2" else "HTTP/1.1"
async def read(
- self, n: int, timeout: TimeoutConfig = None, flag: TimeoutFlag = None
+ self, n: int, timeout: Timeout = None, flag: TimeoutFlag = None
) -> bytes:
if timeout is None:
timeout = self.timeout
self.write_buffer += data # pragma: no cover
async def write(
- self, data: bytes, timeout: TimeoutConfig = None, flag: TimeoutFlag = None
+ self, data: bytes, timeout: Timeout = None, flag: TimeoutFlag = None
) -> None:
if self.write_buffer:
previous_data = self.write_buffer
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> SocketStream:
connect_timeout = _or_inf(timeout.connect_timeout)
path: str,
hostname: typing.Optional[str],
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> SocketStream:
connect_timeout = _or_inf(timeout.connect_timeout)
CertTypes = typing.Union[str, typing.Tuple[str, str], typing.Tuple[str, str, str]]
VerifyTypes = typing.Union[str, bool, ssl.SSLContext]
-TimeoutTypes = typing.Union[
- float, typing.Tuple[float, float, float, float], "TimeoutConfig"
-]
+TimeoutTypes = typing.Union[float, typing.Tuple[float, float, float, float], "Timeout"]
USER_AGENT = f"python-httpx/{__version__}"
)
-class TimeoutConfig:
+class Timeout:
"""
Timeout values.
"""
assert read_timeout is None
assert write_timeout is None
assert pool_timeout is None
- if isinstance(timeout, TimeoutConfig):
+ if isinstance(timeout, Timeout):
self.connect_timeout = timeout.connect_timeout
self.read_timeout = timeout.read_timeout
self.write_timeout = timeout.write_timeout
)
+TimeoutConfig = Timeout # Synonym for backwards compat
+
+
DEFAULT_SSL_CONFIG = SSLConfig(cert=None, verify=True)
-DEFAULT_TIMEOUT_CONFIG = TimeoutConfig(timeout=5.0)
+DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
DEFAULT_POOL_LIMITS = PoolLimits(soft_limit=10, hard_limit=100)
DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
DEFAULT_MAX_REDIRECTS = 20
DEFAULT_TIMEOUT_CONFIG,
CertTypes,
SSLConfig,
- TimeoutConfig,
+ Timeout,
TimeoutTypes,
VerifyTypes,
)
):
self.origin = Origin(origin) if isinstance(origin, str) else origin
self.ssl = SSLConfig(cert=cert, verify=verify, trust_env=trust_env)
- self.timeout = TimeoutConfig(timeout)
+ self.timeout = Timeout(timeout)
self.http2 = http2
self.backend = lookup_backend(backend)
self.release_func = release_func
timeout: TimeoutTypes = None,
) -> None:
ssl = self.ssl.with_overrides(verify=verify, cert=cert)
- timeout = self.timeout if timeout is None else TimeoutConfig(timeout)
+ timeout = self.timeout if timeout is None else Timeout(timeout)
host = self.origin.host
port = self.origin.port
DEFAULT_TIMEOUT_CONFIG,
CertTypes,
PoolLimits,
- TimeoutConfig,
+ Timeout,
TimeoutTypes,
VerifyTypes,
)
):
self.verify = verify
self.cert = cert
- self.timeout = TimeoutConfig(timeout)
+ self.timeout = Timeout(timeout)
self.pool_limits = pool_limits
self.http2 = http2
self.is_closed = False
if timeout is None:
pool_timeout = self.timeout.pool_timeout
else:
- pool_timeout = TimeoutConfig(timeout).pool_timeout
+ pool_timeout = Timeout(timeout).pool_timeout
await self.max_connections.acquire(timeout=pool_timeout)
connection = HTTPConnection(
import h11
from ..concurrency.base import BaseSocketStream, TimeoutFlag
-from ..config import TimeoutConfig, TimeoutTypes
+from ..config import Timeout, TimeoutTypes
from ..exceptions import ConnectionClosed, ProtocolError
from ..models import Request, Response
from ..utils import get_logger
self.timeout_flag = TimeoutFlag()
async def send(self, request: Request, timeout: TimeoutTypes = None) -> Response:
- timeout = None if timeout is None else TimeoutConfig(timeout)
+ timeout = None if timeout is None else Timeout(timeout)
await self._send_request(request, timeout)
await self._send_request_body(request, timeout)
pass
await self.stream.close()
- async def _send_request(
- self, request: Request, timeout: TimeoutConfig = None
- ) -> None:
+ async def _send_request(self, request: Request, timeout: Timeout = None) -> None:
"""
Send the request method, URL, and headers to the network.
"""
await self._send_event(event, timeout)
async def _send_request_body(
- self, request: Request, timeout: TimeoutConfig = None
+ self, request: Request, timeout: Timeout = None
) -> None:
"""
Send the request body to the network.
# Once we've sent the request, we enable read timeouts.
self.timeout_flag.set_read_timeouts()
- async def _send_event(self, event: H11Event, timeout: TimeoutConfig = None) -> None:
+ async def _send_event(self, event: H11Event, timeout: Timeout = None) -> None:
"""
Send a single `h11` event to the network, waiting for the data to
drain before returning.
await self.stream.write(bytes_to_send, timeout)
async def _receive_response(
- self, timeout: TimeoutConfig = None
+ self, timeout: Timeout = None
) -> typing.Tuple[str, int, typing.List[typing.Tuple[bytes, bytes]]]:
"""
Read the response status and headers from the network.
return http_version, event.status_code, event.headers
async def _receive_response_data(
- self, timeout: TimeoutConfig = None
+ self, timeout: Timeout = None
) -> typing.AsyncIterator[bytes]:
"""
Read the response data from the network.
assert isinstance(event, h11.EndOfMessage) or event is h11.PAUSED
break # pragma: no cover
- async def _receive_event(self, timeout: TimeoutConfig = None) -> H11Event:
+ async def _receive_event(self, timeout: Timeout = None) -> H11Event:
"""
Read a single `h11` event, reading more data from the network if needed.
"""
TimeoutFlag,
lookup_backend,
)
-from ..config import TimeoutConfig, TimeoutTypes
+from ..config import Timeout, TimeoutTypes
from ..exceptions import ProtocolError
from ..models import Request, Response
from ..utils import get_logger
self.window_update_received = {} # type: typing.Dict[int, BaseEvent]
async def send(self, request: Request, timeout: TimeoutTypes = None) -> Response:
- timeout = None if timeout is None else TimeoutConfig(timeout)
+ timeout = None if timeout is None else Timeout(timeout)
# Start sending the request.
if not self.initialized:
self.stream.write_no_block(data_to_send)
self.initialized = True
- async def send_headers(
- self, request: Request, timeout: TimeoutConfig = None
- ) -> int:
+ async def send_headers(self, request: Request, timeout: Timeout = None) -> int:
stream_id = self.h2_state.get_next_available_stream_id()
headers = [
(b":method", request.method.encode("ascii")),
self,
stream_id: int,
stream: typing.AsyncIterator[bytes],
- timeout: TimeoutConfig = None,
+ timeout: Timeout = None,
) -> None:
try:
async for data in stream:
self.timeout_flags[stream_id].set_read_timeouts()
async def send_data(
- self, stream_id: int, data: bytes, timeout: TimeoutConfig = None
+ self, stream_id: int, data: bytes, timeout: Timeout = None
) -> None:
while data:
# The data will be divided into frames to send based on the flow control
data_to_send = self.h2_state.data_to_send()
await self.stream.write(data_to_send, timeout)
- async def end_stream(self, stream_id: int, timeout: TimeoutConfig = None) -> None:
+ async def end_stream(self, stream_id: int, timeout: Timeout = None) -> None:
logger.trace(f"end_stream stream_id={stream_id}")
self.h2_state.end_stream(stream_id)
data_to_send = self.h2_state.data_to_send()
await self.stream.write(data_to_send, timeout)
async def receive_response(
- self, stream_id: int, timeout: TimeoutConfig = None
+ self, stream_id: int, timeout: Timeout = None
) -> typing.Tuple[int, typing.List[typing.Tuple[bytes, bytes]]]:
"""
Read the response status and headers from the network.
return (status_code, headers)
async def body_iter(
- self, stream_id: int, timeout: TimeoutConfig = None
+ self, stream_id: int, timeout: Timeout = None
) -> typing.AsyncIterator[bytes]:
while True:
event = await self.receive_event(stream_id, timeout)
break
async def receive_event(
- self, stream_id: int, timeout: TimeoutConfig = None
+ self, stream_id: int, timeout: Timeout = None
) -> h2.events.Event:
while not self.events[stream_id]:
flag = self.timeout_flags[stream_id]
# Timeout exceptions...
-class Timeout(HTTPError):
+class RequestTimeout(HTTPError):
"""
A base class for all timeouts.
"""
-class ConnectTimeout(Timeout):
+class ConnectTimeout(RequestTimeout):
"""
Timeout while establishing a connection.
"""
-class ReadTimeout(Timeout):
+class ReadTimeout(RequestTimeout):
"""
Timeout while reading response data.
"""
-class WriteTimeout(Timeout):
+class WriteTimeout(RequestTimeout):
"""
Timeout while writing request data.
"""
-class PoolTimeout(Timeout):
+class PoolTimeout(RequestTimeout):
"""
Timeout while waiting to acquire a connection from the pool.
"""
import h2.connection
import h2.events
-from httpx import AsyncioBackend, BaseSocketStream, Request, TimeoutConfig
+from httpx import AsyncioBackend, BaseSocketStream, Request, Timeout
from tests.concurrency import sleep
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> BaseSocketStream:
self.server = MockHTTP2Server(self.app, backend=self.backend)
return self.server
hostname: str,
port: int,
ssl_context: typing.Optional[ssl.SSLContext],
- timeout: TimeoutConfig,
+ timeout: Timeout,
) -> BaseSocketStream:
self.received_data.append(
b"--- CONNECT(%s, %d) ---" % (hostname.encode(), port)
self.backend = backend
async def start_tls(
- self, hostname: str, ssl_context: ssl.SSLContext, timeout: TimeoutConfig
+ self, hostname: str, ssl_context: ssl.SSLContext, timeout: Timeout
) -> BaseSocketStream:
self.backend.received_data.append(b"--- START_TLS(%s) ---" % hostname.encode())
return MockRawSocketStream(self.backend)
def write_no_block(self, data: bytes) -> None:
self.backend.received_data.append(data)
- async def write(self, data: bytes, timeout: TimeoutConfig = None) -> None:
+ async def write(self, data: bytes, timeout: Timeout = None) -> None:
if data:
self.write_no_block(data)
import pytest
import trio
-from httpx import AsyncioBackend, SSLConfig, TimeoutConfig
+from httpx import AsyncioBackend, SSLConfig, Timeout
from httpx.concurrency.trio import TrioBackend
from tests.concurrency import run_concurrently
)
async def test_start_tls_on_tcp_socket_stream(https_server, backend, get_cipher):
ctx = SSLConfig().load_ssl_context_no_verify()
- timeout = TimeoutConfig(5)
+ timeout = Timeout(5)
stream = await backend.open_tcp_stream(
https_server.url.host, https_server.url.port, None, timeout
)
async def test_start_tls_on_uds_socket_stream(https_uds_server, backend, get_cipher):
ctx = SSLConfig().load_ssl_context_no_verify()
- timeout = TimeoutConfig(5)
+ timeout = Timeout(5)
stream = await backend.open_uds_stream(
https_uds_server.config.uds, https_uds_server.url.host, None, timeout
Regression test for: https://github.com/encode/httpx/issues/527
"""
stream = await backend.open_tcp_stream(
- server.url.host, server.url.port, ssl_context=None, timeout=TimeoutConfig(5)
+ server.url.host, server.url.port, ssl_context=None, timeout=Timeout(5)
)
try:
await stream.write(b"GET / HTTP/1.1\r\n\r\n")
def test_timeout_eq():
- timeout = httpx.TimeoutConfig(timeout=5.0)
- assert timeout == httpx.TimeoutConfig(timeout=5.0)
+ timeout = httpx.Timeout(timeout=5.0)
+ assert timeout == httpx.Timeout(timeout=5.0)
def test_timeout_from_nothing():
- timeout = httpx.TimeoutConfig()
+ timeout = httpx.Timeout()
assert timeout.connect_timeout is None
assert timeout.read_timeout is None
assert timeout.write_timeout is None
def test_timeout_from_none():
- timeout = httpx.TimeoutConfig(timeout=None)
- assert timeout == httpx.TimeoutConfig()
+ timeout = httpx.Timeout(timeout=None)
+ assert timeout == httpx.Timeout()
def test_timeout_from_one_none_value():
- timeout = httpx.TimeoutConfig(read_timeout=None)
- assert timeout == httpx.TimeoutConfig()
+ timeout = httpx.Timeout(read_timeout=None)
+ assert timeout == httpx.Timeout()
def test_timeout_from_tuple():
- timeout = httpx.TimeoutConfig(timeout=(5.0, 5.0, 5.0, 5.0))
- assert timeout == httpx.TimeoutConfig(timeout=5.0)
+ timeout = httpx.Timeout(timeout=(5.0, 5.0, 5.0, 5.0))
+ assert timeout == httpx.Timeout(timeout=5.0)
def test_timeout_from_config_instance():
- timeout = httpx.TimeoutConfig(timeout=5.0)
- assert httpx.TimeoutConfig(timeout) == httpx.TimeoutConfig(timeout=5.0)
+ timeout = httpx.Timeout(timeout=5.0)
+ assert httpx.Timeout(timeout) == httpx.Timeout(timeout=5.0)
def test_timeout_repr():
- timeout = httpx.TimeoutConfig(timeout=5.0)
- assert repr(timeout) == "TimeoutConfig(timeout=5.0)"
+ timeout = httpx.Timeout(timeout=5.0)
+ assert repr(timeout) == "Timeout(timeout=5.0)"
- timeout = httpx.TimeoutConfig(read_timeout=5.0)
+ timeout = httpx.Timeout(read_timeout=5.0)
assert repr(timeout) == (
- "TimeoutConfig(connect_timeout=None, read_timeout=5.0, "
+ "Timeout(connect_timeout=None, read_timeout=5.0, "
"write_timeout=None, pool_timeout=None)"
)
PoolLimits,
PoolTimeout,
ReadTimeout,
- TimeoutConfig,
+ Timeout,
WriteTimeout,
)
async def test_read_timeout(server, backend):
- timeout = TimeoutConfig(read_timeout=1e-6)
+ timeout = Timeout(read_timeout=1e-6)
async with Client(timeout=timeout, backend=backend) as client:
with pytest.raises(ReadTimeout):
async def test_write_timeout(server, backend):
- timeout = TimeoutConfig(write_timeout=1e-6)
+ timeout = Timeout(write_timeout=1e-6)
async with Client(timeout=timeout, backend=backend) as client:
with pytest.raises(WriteTimeout):
async def test_connect_timeout(server, backend):
- timeout = TimeoutConfig(connect_timeout=1e-6)
+ timeout = Timeout(connect_timeout=1e-6)
async with Client(timeout=timeout, backend=backend) as client:
with pytest.raises(ConnectTimeout):
async def test_pool_timeout(server, backend):
pool_limits = PoolLimits(hard_limit=1)
- timeout = TimeoutConfig(pool_timeout=1e-4)
+ timeout = Timeout(pool_timeout=1e-4)
async with Client(
pool_limits=pool_limits, timeout=timeout, backend=backend