DEFAULT_TIMEOUT_CONFIG,
CertTypes,
PoolLimits,
+ Timeout,
TimeoutTypes,
VerifyTypes,
)
logger = get_logger(__name__)
+class UnsetType:
+ pass # pragma: nocover
+
+
+UNSET = UnsetType()
+
+
class Client:
"""
An HTTP client, with connection pooling, HTTP/2, redirects, cookie persistence, etc.
dispatch = ConnectionPool(
verify=verify,
cert=cert,
- timeout=timeout,
http2=http2,
pool_limits=pool_limits,
backend=backend,
self._params = QueryParams(params)
self._headers = Headers(headers)
self._cookies = Cookies(cookies)
+ self.timeout = Timeout(timeout)
self.max_redirects = max_redirects
self.trust_env = trust_env
self.dispatch = dispatch
proxies,
verify=verify,
cert=cert,
- timeout=timeout,
http2=http2,
pool_limits=pool_limits,
backend=backend,
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
request = self.build_request(
allow_redirects: bool = True,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
if request.url.scheme not in ("http", "https"):
auth = self.auth if auth is None else auth
trust_env = self.trust_env if trust_env is None else trust_env
+ timeout = self.timeout if isinstance(timeout, UnsetType) else Timeout(timeout)
if not isinstance(auth, Middleware):
request = self.authenticate(request, trust_env, auth)
async def send_handling_redirects(
self,
request: Request,
+ timeout: Timeout,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
allow_redirects: bool = True,
history: typing.List[Response] = None,
) -> Response:
async def send_single_request(
self,
request: Request,
+ timeout: Timeout,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
) -> Response:
"""
Sends a single request, without handling any redirections.
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
allow_redirects: bool = False, # NOTE: Differs to usual default.
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
allow_redirects: bool = True,
cert: CertTypes = None,
verify: VerifyTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
trust_env: bool = None,
) -> Response:
return await self.request(
proxies: typing.Optional[ProxiesTypes],
verify: VerifyTypes,
cert: typing.Optional[CertTypes],
- timeout: TimeoutTypes,
http2: bool,
pool_limits: PoolLimits,
backend: typing.Union[str, ConcurrencyBackend],
trust_env: bool,
) -> typing.Dict[str, Dispatcher]:
def _proxy_from_url(url: URLTypes) -> Dispatcher:
- nonlocal verify, cert, timeout, http2, pool_limits, backend, trust_env
+ nonlocal verify, cert, http2, pool_limits, backend, trust_env
url = URL(url)
if url.scheme in ("http", "https"):
return HTTPProxy(
url,
verify=verify,
cert=cert,
- timeout=timeout,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
) -> "BaseSocketStream":
raise NotImplementedError() # pragma: no cover
- async def read(
- self, n: int, timeout: Timeout = None, flag: typing.Any = None
- ) -> bytes:
+ async def read(self, n: int, timeout: Timeout, flag: typing.Any = None) -> bytes:
raise NotImplementedError() # pragma: no cover
def write_no_block(self, data: bytes) -> None:
raise NotImplementedError() # pragma: no cover
- async def write(self, data: bytes, timeout: Timeout = None) -> None:
+ async def write(self, data: bytes, timeout: Timeout) -> None:
raise NotImplementedError() # pragma: no cover
async def close(self) -> None:
import typing
from types import TracebackType
-from ..config import CertTypes, TimeoutTypes, VerifyTypes
+from ..config import CertTypes, Timeout, VerifyTypes
from ..models import (
HeaderTypes,
QueryParamTypes,
headers: HeaderTypes = None,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: Timeout = None,
) -> Response:
request = Request(method, url, data=data, params=params, headers=headers)
return await self.send(request, verify=verify, cert=cert, timeout=timeout)
request: Request,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: Timeout = None,
) -> Response:
raise NotImplementedError() # pragma: nocover
import typing
from ..concurrency.base import ConcurrencyBackend, lookup_backend
-from ..config import (
- DEFAULT_TIMEOUT_CONFIG,
- CertTypes,
- SSLConfig,
- Timeout,
- TimeoutTypes,
- VerifyTypes,
-)
+from ..config import CertTypes, SSLConfig, Timeout, VerifyTypes
from ..models import Origin, Request, Response
from ..utils import get_logger
from .base import Dispatcher
verify: VerifyTypes = True,
cert: CertTypes = None,
trust_env: bool = None,
- timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
http2: bool = False,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
release_func: typing.Optional[ReleaseCallback] = None,
):
self.origin = Origin(origin) if isinstance(origin, str) else origin
self.ssl = SSLConfig(cert=cert, verify=verify, trust_env=trust_env)
- self.timeout = Timeout(timeout)
self.http2 = http2
self.backend = lookup_backend(backend)
self.release_func = release_func
request: Request,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: Timeout = None,
) -> Response:
+ timeout = Timeout() if timeout is None else timeout
+
if self.h11_connection is None and self.h2_connection is None:
await self.connect(verify=verify, cert=cert, timeout=timeout)
return response
async def connect(
- self,
- verify: VerifyTypes = None,
- cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ self, timeout: Timeout, verify: VerifyTypes = None, cert: CertTypes = None,
) -> None:
ssl = self.ssl.with_overrides(verify=verify, cert=cert)
- timeout = self.timeout if timeout is None else Timeout(timeout)
host = self.origin.host
port = self.origin.port
import typing
from ..concurrency.base import BasePoolSemaphore, ConcurrencyBackend, lookup_backend
-from ..config import (
- DEFAULT_POOL_LIMITS,
- DEFAULT_TIMEOUT_CONFIG,
- CertTypes,
- PoolLimits,
- Timeout,
- TimeoutTypes,
- VerifyTypes,
-)
+from ..config import DEFAULT_POOL_LIMITS, CertTypes, PoolLimits, Timeout, VerifyTypes
from ..models import Origin, Request, Response
from ..utils import get_logger
from .base import Dispatcher
verify: VerifyTypes = True,
cert: CertTypes = None,
trust_env: bool = None,
- timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
http2: bool = False,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
):
self.verify = verify
self.cert = cert
- self.timeout = Timeout(timeout)
self.pool_limits = pool_limits
self.http2 = http2
self.is_closed = False
request: Request,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: Timeout = None,
) -> Response:
connection = await self.acquire_connection(
origin=request.url.origin, timeout=timeout
return response
async def acquire_connection(
- self, origin: Origin, timeout: TimeoutTypes = None
+ self, origin: Origin, timeout: Timeout = None
) -> HTTPConnection:
logger.trace(f"acquire_connection origin={origin!r}")
connection = self.pop_connection(origin)
if connection is None:
- if timeout is None:
- pool_timeout = self.timeout.pool_timeout
- else:
- pool_timeout = Timeout(timeout).pool_timeout
+ pool_timeout = None if timeout is None else timeout.pool_timeout
await self.max_connections.acquire(timeout=pool_timeout)
connection = HTTPConnection(
origin,
verify=self.verify,
cert=self.cert,
- timeout=self.timeout,
http2=self.http2,
backend=self.backend,
release_func=self.release_connection,
import h11
from ..concurrency.base import BaseSocketStream, TimeoutFlag
-from ..config import Timeout, TimeoutTypes
+from ..config import Timeout
from ..exceptions import ConnectionClosed, ProtocolError
from ..models import Request, Response
from ..utils import get_logger
self.h11_state = h11.Connection(our_role=h11.CLIENT)
self.timeout_flag = TimeoutFlag()
- async def send(self, request: Request, timeout: TimeoutTypes = None) -> Response:
- timeout = None if timeout is None else Timeout(timeout)
+ async def send(self, request: Request, timeout: Timeout = None) -> Response:
+ timeout = Timeout() if timeout is None else timeout
await self._send_request(request, timeout)
await self._send_request_body(request, timeout)
pass
await self.stream.close()
- async def _send_request(self, request: Request, timeout: Timeout = None) -> None:
+ async def _send_request(self, request: Request, timeout: Timeout) -> None:
"""
Send the request method, URL, and headers to the network.
"""
event = h11.Request(method=method, target=target, headers=headers)
await self._send_event(event, timeout)
- async def _send_request_body(
- self, request: Request, timeout: Timeout = None
- ) -> None:
+ async def _send_request_body(self, request: Request, timeout: Timeout) -> None:
"""
Send the request body to the network.
"""
# Once we've sent the request, we enable read timeouts.
self.timeout_flag.set_read_timeouts()
- async def _send_event(self, event: H11Event, timeout: Timeout = None) -> None:
+ async def _send_event(self, event: H11Event, timeout: Timeout) -> None:
"""
Send a single `h11` event to the network, waiting for the data to
drain before returning.
await self.stream.write(bytes_to_send, timeout)
async def _receive_response(
- self, timeout: Timeout = None
+ self, timeout: Timeout
) -> typing.Tuple[str, int, typing.List[typing.Tuple[bytes, bytes]]]:
"""
Read the response status and headers from the network.
return http_version, event.status_code, event.headers
async def _receive_response_data(
- self, timeout: Timeout = None
+ self, timeout: Timeout
) -> typing.AsyncIterator[bytes]:
"""
Read the response data from the network.
assert isinstance(event, h11.EndOfMessage) or event is h11.PAUSED
break # pragma: no cover
- async def _receive_event(self, timeout: Timeout = None) -> H11Event:
+ async def _receive_event(self, timeout: Timeout) -> H11Event:
"""
Read a single `h11` event, reading more data from the network if needed.
"""
TimeoutFlag,
lookup_backend,
)
-from ..config import Timeout, TimeoutTypes
+from ..config import Timeout
from ..exceptions import ProtocolError
from ..models import Request, Response
from ..utils import get_logger
self.initialized = False
self.window_update_received = {} # type: typing.Dict[int, BaseEvent]
- async def send(self, request: Request, timeout: TimeoutTypes = None) -> Response:
- timeout = None if timeout is None else Timeout(timeout)
+ async def send(self, request: Request, timeout: Timeout = None) -> Response:
+ timeout = Timeout() if timeout is None else timeout
# Start sending the request.
if not self.initialized:
self.stream.write_no_block(data_to_send)
self.initialized = True
- async def send_headers(self, request: Request, timeout: Timeout = None) -> int:
+ async def send_headers(self, request: Request, timeout: Timeout) -> int:
stream_id = self.h2_state.get_next_available_stream_id()
headers = [
(b":method", request.method.encode("ascii")),
return stream_id
async def send_request_data(
- self,
- stream_id: int,
- stream: typing.AsyncIterator[bytes],
- timeout: Timeout = None,
+ self, stream_id: int, stream: typing.AsyncIterator[bytes], timeout: Timeout,
) -> None:
try:
async for data in stream:
# Once we've sent the request we should enable read timeouts.
self.timeout_flags[stream_id].set_read_timeouts()
- async def send_data(
- self, stream_id: int, data: bytes, timeout: Timeout = None
- ) -> None:
+ async def send_data(self, stream_id: int, data: bytes, timeout: Timeout) -> None:
while data:
# The data will be divided into frames to send based on the flow control
# window and the maximum frame size. Because the flow control window
data_to_send = self.h2_state.data_to_send()
await self.stream.write(data_to_send, timeout)
- async def end_stream(self, stream_id: int, timeout: Timeout = None) -> None:
+ async def end_stream(self, stream_id: int, timeout: Timeout) -> None:
logger.trace(f"end_stream stream_id={stream_id}")
self.h2_state.end_stream(stream_id)
data_to_send = self.h2_state.data_to_send()
await self.stream.write(data_to_send, timeout)
async def receive_response(
- self, stream_id: int, timeout: Timeout = None
+ self, stream_id: int, timeout: Timeout
) -> typing.Tuple[int, typing.List[typing.Tuple[bytes, bytes]]]:
"""
Read the response status and headers from the network.
return (status_code, headers)
async def body_iter(
- self, stream_id: int, timeout: Timeout = None
+ self, stream_id: int, timeout: Timeout
) -> typing.AsyncIterator[bytes]:
while True:
event = await self.receive_event(stream_id, timeout)
elif isinstance(event, (h2.events.StreamEnded, h2.events.StreamReset)):
break
- async def receive_event(
- self, stream_id: int, timeout: Timeout = None
- ) -> h2.events.Event:
+ async def receive_event(self, stream_id: int, timeout: Timeout) -> h2.events.Event:
while not self.events[stream_id]:
flag = self.timeout_flags[stream_id]
data = await self.stream.read(self.READ_NUM_BYTES, timeout, flag=flag)
from ..concurrency.base import ConcurrencyBackend
from ..config import (
DEFAULT_POOL_LIMITS,
- DEFAULT_TIMEOUT_CONFIG,
CertTypes,
PoolLimits,
SSLConfig,
- TimeoutTypes,
+ Timeout,
VerifyTypes,
)
from ..exceptions import ProxyError
verify: VerifyTypes = True,
cert: CertTypes = None,
trust_env: bool = None,
- timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
http2: bool = False,
backend: typing.Union[str, ConcurrencyBackend] = "auto",
super(HTTPProxy, self).__init__(
verify=verify,
cert=cert,
- timeout=timeout,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
return f"Basic {token}"
async def acquire_connection(
- self, origin: Origin, timeout: TimeoutTypes = None
+ self, origin: Origin, timeout: Timeout = None
) -> HTTPConnection:
if self.should_forward_origin(origin):
logger.trace(
logger.trace(
f"tunnel_connection proxy_url={self.proxy_url!r} origin={origin!r}"
)
- return await self.tunnel_connection(origin)
+ return await self.tunnel_connection(origin, timeout)
- async def tunnel_connection(self, origin: Origin) -> HTTPConnection:
+ async def tunnel_connection(
+ self, origin: Origin, timeout: Timeout = None
+ ) -> HTTPConnection:
"""Creates a new HTTPConnection via the CONNECT method
usually reserved for proxying HTTPS connections.
"""
connection.origin = origin
self.active_connections.add(connection)
- await self.tunnel_start_tls(origin, connection)
+ timeout = Timeout() if timeout is None else timeout
+ await self.tunnel_start_tls(origin, connection, timeout)
else:
self.active_connections.add(connection)
self.proxy_url.origin,
verify=self.verify,
cert=self.cert,
- timeout=self.timeout,
backend=self.backend,
http2=False, # Short-lived 'connection'
trust_env=self.trust_env,
return connection
async def tunnel_start_tls(
- self, origin: Origin, connection: HTTPConnection
+ self, origin: Origin, connection: HTTPConnection, timeout: Timeout = None
) -> None:
"""Runs start_tls() on a TCP-tunneled connection"""
+ timeout = Timeout() if timeout is None else timeout
# Store this information here so that we can transfer
# it to the new internal connection object after
# HTTP connection object and run start_tls()
if origin.is_ssl:
ssl_config = SSLConfig(cert=self.cert, verify=self.verify)
- timeout = connection.timeout
ssl_context = await connection.get_ssl_context(ssl_config)
assert ssl_context is not None
request: Request,
verify: VerifyTypes = None,
cert: CertTypes = None,
- timeout: TimeoutTypes = None,
+ timeout: Timeout = None,
) -> Response:
if self.should_forward_origin(request.url.origin):
for prop in [
"verify",
"cert",
- "timeout",
"pool_limits",
]:
assert getattr(pool, prop) == getattr(proxy, prop)