T = typing.TypeVar("T", bound="HTTPTransport")
A = typing.TypeVar("A", bound="AsyncHTTPTransport")
+SOCKET_OPTION = typing.Union[
+ typing.Tuple[int, int, int],
+ typing.Tuple[int, int, typing.Union[bytes, bytearray]],
+ typing.Tuple[int, int, None, int],
+]
+
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
uds: typing.Optional[str] = None,
local_address: typing.Optional[str] = None,
retries: int = 0,
+ socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
uds=uds,
local_address=local_address,
retries=retries,
+ socket_options=socket_options,
)
elif proxy.url.scheme in ("http", "https"):
self._pool = httpcore.HTTPProxy(
keepalive_expiry=limits.keepalive_expiry,
http1=http1,
http2=http2,
+ socket_options=socket_options,
)
elif proxy.url.scheme == "socks5":
try:
uds: typing.Optional[str] = None,
local_address: typing.Optional[str] = None,
retries: int = 0,
+ socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
uds=uds,
local_address=local_address,
retries=retries,
+ socket_options=socket_options,
)
elif proxy.url.scheme in ("http", "https"):
self._pool = httpcore.AsyncHTTPProxy(
keepalive_expiry=limits.keepalive_expiry,
http1=http1,
http2=http2,
+ socket_options=socket_options,
)
elif proxy.url.scheme == "socks5":
try: