* Finesse timeout argument.
* Drop unused imports
* Add 'cert' and 'verify' arguments
from .api import delete, get, head, options, patch, post, put, request
from .client import AsyncClient, Client
from .concurrency import AsyncioBackend
-from .config import PoolLimits, SSLConfig, TimeoutConfig
+from .config import (
+ CertTypes,
+ PoolLimits,
+ SSLConfig,
+ TimeoutConfig,
+ TimeoutTypes,
+ VerifyTypes,
+)
from .dispatch.connection import HTTPConnection
from .dispatch.connection_pool import ConnectionPool
from .exceptions import (
import typing
from .client import Client
-from .config import SSLConfig, TimeoutConfig
+from .config import CertTypes, TimeoutTypes, VerifyTypes
from .models import (
AuthTypes,
CookieTypes,
method: str,
url: URLTypes,
*,
+ params: QueryParamTypes = None,
data: RequestData = b"",
json: typing.Any = None,
- params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
- stream: bool = False,
+ # files
auth: AuthTypes = None,
+ timeout: TimeoutTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ # proxies
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ stream: bool = False,
) -> SyncResponse:
with Client() as client:
return client.request(
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"GET",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"OPTIONS",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = False, # Note: Differs to usual default.
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"HEAD",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"POST",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"PUT",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"PATCH",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return request(
"DELETE",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ cert=cert,
+ verify=verify,
timeout=timeout,
)
from .config import (
DEFAULT_MAX_REDIRECTS,
DEFAULT_POOL_LIMITS,
- DEFAULT_SSL_CONFIG,
DEFAULT_TIMEOUT_CONFIG,
+ CertTypes,
PoolLimits,
- SSLConfig,
- TimeoutConfig,
+ TimeoutTypes,
+ VerifyTypes,
)
from .dispatch.connection_pool import ConnectionPool
from .exceptions import RedirectBodyUnavailable, RedirectLoop, TooManyRedirects
self,
auth: AuthTypes = None,
cookies: CookieTypes = None,
- ssl: SSLConfig = DEFAULT_SSL_CONFIG,
- timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
+ verify: VerifyTypes = True,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
dispatch: Dispatcher = None,
):
if dispatch is None:
dispatch = ConnectionPool(
- ssl=ssl, timeout=timeout, pool_limits=pool_limits, backend=backend
+ verify=verify,
+ cert=cert,
+ timeout=timeout,
+ pool_limits=pool_limits,
+ backend=backend,
)
self.auth = auth
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"GET",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"OPTIONS",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = False, # Note: Differs to usual default.
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"HEAD",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"POST",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"PUT",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"PATCH",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
return await self.request(
"DELETE",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
request = Request(
method,
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
return response
*,
stream: bool = False,
auth: AuthTypes = None,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
allow_redirects: bool = True,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
if auth is None:
auth = self.auth
response = await self.send_handling_redirects(
request,
stream=stream,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
allow_redirects=allow_redirects,
)
request: Request,
*,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
allow_redirects: bool = True,
history: typing.List[Response] = None,
) -> Response:
raise RedirectLoop()
response = await self.dispatch.send(
- request, stream=stream, ssl=ssl, timeout=timeout
+ request, stream=stream, verify=verify, cert=cert, timeout=timeout
)
response.history = list(history)
self.cookies.extract_cookies(response)
else:
async def send_next() -> Response:
- nonlocal request, response, ssl, allow_redirects, timeout, history
+ nonlocal request, response, verify, cert, allow_redirects, timeout, history
request = self.build_redirect_request(request, response)
response = await self.send_handling_redirects(
request,
stream=stream,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
history=history,
)
def __init__(
self,
auth: AuthTypes = None,
- ssl: SSLConfig = DEFAULT_SSL_CONFIG,
- timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
+ cert: CertTypes = None,
+ verify: VerifyTypes = True,
+ timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
dispatch: Dispatcher = None,
) -> None:
self._client = AsyncClient(
auth=auth,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
pool_limits=pool_limits,
max_redirects=max_redirects,
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
request = Request(
method,
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
return response
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"GET",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"OPTIONS",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = False, # Note: Differs to usual default.
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"HEAD",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"POST",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"PUT",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"PATCH",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ cert: CertTypes = None,
+ verify: VerifyTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
return self.request(
"DELETE",
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
stream: bool = False,
auth: AuthTypes = None,
allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> SyncResponse:
response = self._loop.run_until_complete(
self._client.send(
stream=stream,
auth=auth,
allow_redirects=allow_redirects,
- ssl=ssl,
+ verify=verify,
+ cert=cert,
timeout=timeout,
)
)
Protocol,
)
-OptionalTimeout = typing.Optional[TimeoutConfig]
-
-
SSL_MONKEY_PATCH_APPLIED = False
self.stream_reader = stream_reader
self.timeout = timeout
- async def read(self, n: int, timeout: OptionalTimeout = None) -> bytes:
+ async def read(self, n: int, timeout: TimeoutConfig = None) -> bytes:
if timeout is None:
timeout = self.timeout
def write_no_block(self, data: bytes) -> None:
self.stream_writer.write(data) # pragma: nocover
- async def write(self, data: bytes, timeout: OptionalTimeout = None) -> None:
+ async def write(self, data: bytes, timeout: TimeoutConfig = None) -> None:
if not data:
return
import certifi
+CertTypes = typing.Union[str, typing.Tuple[str, str]]
+VerifyTypes = typing.Union[str, bool]
+TimeoutTypes = typing.Union[float, typing.Tuple[float, float, float], "TimeoutConfig"]
+
class SSLConfig:
"""
SSL Configuration.
"""
- def __init__(
- self,
- *,
- cert: typing.Union[None, str, typing.Tuple[str, str]] = None,
- verify: typing.Union[str, bool] = True,
- ):
+ def __init__(self, *, cert: CertTypes = None, verify: VerifyTypes = True):
self.cert = cert
self.verify = verify
class_name = self.__class__.__name__
return f"{class_name}(cert={self.cert}, verify={self.verify})"
+ def with_overrides(
+ self, cert: CertTypes = None, verify: VerifyTypes = None
+ ) -> "SSLConfig":
+ cert = self.cert if cert is None else cert
+ verify = self.verify if verify is None else verify
+ if (cert == self.cert) and (verify == self.verify):
+ return self
+ return SSLConfig(cert=cert, verify=verify)
+
async def load_ssl_context(self) -> ssl.SSLContext:
if not hasattr(self, "ssl_context"):
if not self.verify:
def __init__(
self,
- timeout: float = None,
+ timeout: TimeoutTypes = None,
*,
connect_timeout: float = None,
read_timeout: float = None,
write_timeout: float = None,
):
- if timeout is not None:
+ if timeout is None:
+ self.connect_timeout = connect_timeout
+ self.read_timeout = read_timeout
+ self.write_timeout = write_timeout
+ else:
# Specified as a single timeout value
assert connect_timeout is None
assert read_timeout is None
assert write_timeout is None
- connect_timeout = timeout
- read_timeout = timeout
- write_timeout = timeout
-
- self.timeout = timeout
- self.connect_timeout = connect_timeout
- self.read_timeout = read_timeout
- self.write_timeout = write_timeout
+ if isinstance(timeout, TimeoutConfig):
+ self.connect_timeout = timeout.connect_timeout
+ self.read_timeout = timeout.read_timeout
+ self.write_timeout = timeout.write_timeout
+ elif isinstance(timeout, tuple):
+ self.connect_timeout = timeout[0]
+ self.read_timeout = timeout[1]
+ self.write_timeout = timeout[2]
+ else:
+ self.connect_timeout = timeout
+ self.read_timeout = timeout
+ self.write_timeout = timeout
def __eq__(self, other: typing.Any) -> bool:
return (
def __repr__(self) -> str:
class_name = self.__class__.__name__
- if self.timeout is not None:
- return f"{class_name}(timeout={self.timeout})"
+ if len(set([self.connect_timeout, self.read_timeout, self.write_timeout])) == 1:
+ return f"{class_name}(timeout={self.connect_timeout})"
return f"{class_name}(connect_timeout={self.connect_timeout}, read_timeout={self.read_timeout}, write_timeout={self.write_timeout})"
from ..config import (
DEFAULT_SSL_CONFIG,
DEFAULT_TIMEOUT_CONFIG,
+ CertTypes,
SSLConfig,
TimeoutConfig,
+ TimeoutTypes,
+ VerifyTypes,
)
from ..exceptions import ConnectTimeout
from ..interfaces import ConcurrencyBackend, Dispatcher, Protocol
def __init__(
self,
origin: typing.Union[str, Origin],
- ssl: SSLConfig = DEFAULT_SSL_CONFIG,
- timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
+ verify: VerifyTypes = True,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
backend: ConcurrencyBackend = None,
release_func: typing.Optional[ReleaseCallback] = None,
):
self.origin = Origin(origin) if isinstance(origin, str) else origin
- self.ssl = ssl
- self.timeout = timeout
+ self.ssl = SSLConfig(cert=cert, verify=verify)
+ self.timeout = TimeoutConfig(timeout)
self.backend = AsyncioBackend() if backend is None else backend
self.release_func = release_func
self.h11_connection = None # type: typing.Optional[HTTP11Connection]
self,
request: Request,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
if self.h11_connection is None and self.h2_connection is None:
- await self.connect(ssl=ssl, timeout=timeout)
+ await self.connect(verify=verify, cert=cert, timeout=timeout)
if self.h2_connection is not None:
response = await self.h2_connection.send(
return response
async def connect(
- self, ssl: SSLConfig = None, timeout: TimeoutConfig = None
+ self,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> None:
- if ssl is None:
- ssl = self.ssl
- if timeout is None:
- timeout = self.timeout
+ ssl = self.ssl.with_overrides(verify=verify, cert=cert)
+ timeout = self.timeout if timeout is None else TimeoutConfig(timeout)
host = self.origin.host
port = self.origin.port
from ..config import (
DEFAULT_CA_BUNDLE_PATH,
DEFAULT_POOL_LIMITS,
- DEFAULT_SSL_CONFIG,
DEFAULT_TIMEOUT_CONFIG,
+ CertTypes,
PoolLimits,
- SSLConfig,
- TimeoutConfig,
+ TimeoutTypes,
+ VerifyTypes,
)
from ..decoders import ACCEPT_ENCODING
from ..exceptions import PoolTimeout
def __init__(
self,
*,
- ssl: SSLConfig = DEFAULT_SSL_CONFIG,
- timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
+ verify: VerifyTypes = True,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
backend: ConcurrencyBackend = None,
):
- self.ssl = ssl
+ self.verify = verify
+ self.cert = cert
self.timeout = timeout
self.pool_limits = pool_limits
self.is_closed = False
self,
request: Request,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
connection = await self.acquire_connection(request.url.origin)
try:
response = await connection.send(
- request, stream=stream, ssl=ssl, timeout=timeout
+ request, stream=stream, verify=verify, cert=cert, timeout=timeout
)
except BaseException as exc:
self.active_connections.remove(connection)
await self.max_connections.acquire()
connection = HTTPConnection(
origin,
- ssl=self.ssl,
+ verify=self.verify,
+ cert=self.cert,
timeout=self.timeout,
backend=self.backend,
release_func=self.release_connection,
import h11
-from ..config import (
- DEFAULT_SSL_CONFIG,
- DEFAULT_TIMEOUT_CONFIG,
- SSLConfig,
- TimeoutConfig,
-)
+from ..config import DEFAULT_TIMEOUT_CONFIG, TimeoutConfig, TimeoutTypes
from ..exceptions import ConnectTimeout, ReadTimeout
from ..interfaces import BaseReader, BaseWriter, Dispatcher
from ..models import Request, Response
]
-OptionalTimeout = typing.Optional[TimeoutConfig]
-
# Callback signature: async def callback() -> None
# In practice the callback will be a functools partial, which binds
# the `ConnectionPool.release_connection(conn: HTTPConnection)` method.
self.h11_state = h11.Connection(our_role=h11.CLIENT)
async def send(
- self, request: Request, stream: bool = False, timeout: TimeoutConfig = None
+ self, request: Request, stream: bool = False, timeout: TimeoutTypes = None
) -> Response:
+ timeout = None if timeout is None else TimeoutConfig(timeout)
+
# Start sending the request.
method = request.method.encode("ascii")
target = request.url.full_path.encode("ascii")
self.h11_state.send(event)
await self.writer.close()
- async def _body_iter(self, timeout: OptionalTimeout) -> typing.AsyncIterator[bytes]:
+ async def _body_iter(
+ self, timeout: TimeoutConfig = None
+ ) -> typing.AsyncIterator[bytes]:
event = await self._receive_event(timeout)
while isinstance(event, h11.Data):
yield event.data
event = await self._receive_event(timeout)
assert isinstance(event, h11.EndOfMessage)
- async def _send_event(self, event: H11Event, timeout: OptionalTimeout) -> None:
+ async def _send_event(self, event: H11Event, timeout: TimeoutConfig = None) -> None:
data = self.h11_state.send(event)
await self.writer.write(data, timeout)
- async def _receive_event(self, timeout: OptionalTimeout) -> H11Event:
+ async def _receive_event(self, timeout: TimeoutConfig = None) -> H11Event:
event = self.h11_state.next_event()
while event is h11.NEED_DATA:
import h2.connection
import h2.events
-from ..config import (
- DEFAULT_SSL_CONFIG,
- DEFAULT_TIMEOUT_CONFIG,
- SSLConfig,
- TimeoutConfig,
-)
+from ..config import DEFAULT_TIMEOUT_CONFIG, TimeoutConfig, TimeoutTypes
from ..exceptions import ConnectTimeout, ReadTimeout
from ..interfaces import BaseReader, BaseWriter, Dispatcher
from ..models import Request, Response
-OptionalTimeout = typing.Optional[TimeoutConfig]
-
class HTTP2Connection:
READ_NUM_BYTES = 4096
self.initialized = False
async def send(
- self, request: Request, stream: bool = False, timeout: TimeoutConfig = None
+ self, request: Request, stream: bool = False, timeout: TimeoutTypes = None
) -> Response:
+ timeout = None if timeout is None else TimeoutConfig(timeout)
+
# Start sending the request.
if not self.initialized:
self.initiate_connection()
self.writer.write_no_block(data_to_send)
self.initialized = True
- async def send_headers(self, request: Request, timeout: OptionalTimeout) -> int:
+ async def send_headers(
+ self, request: Request, timeout: TimeoutConfig = None
+ ) -> int:
stream_id = self.h2_state.get_next_available_stream_id()
headers = [
(b":method", request.method.encode("ascii")),
return stream_id
async def send_data(
- self, stream_id: int, data: bytes, timeout: OptionalTimeout
+ self, stream_id: int, data: bytes, timeout: TimeoutConfig = None
) -> None:
self.h2_state.send_data(stream_id, data)
data_to_send = self.h2_state.data_to_send()
await self.writer.write(data_to_send, timeout)
- async def end_stream(self, stream_id: int, timeout: OptionalTimeout) -> None:
+ async def end_stream(self, stream_id: int, timeout: TimeoutConfig = None) -> None:
self.h2_state.end_stream(stream_id)
data_to_send = self.h2_state.data_to_send()
await self.writer.write(data_to_send, timeout)
async def body_iter(
- self, stream_id: int, timeout: OptionalTimeout
+ self, stream_id: int, timeout: TimeoutConfig = None
) -> typing.AsyncIterator[bytes]:
while True:
event = await self.receive_event(stream_id, timeout)
break
async def receive_event(
- self, stream_id: int, timeout: OptionalTimeout
+ self, stream_id: int, timeout: TimeoutConfig = None
) -> h2.events.Event:
while not self.events[stream_id]:
data = await self.reader.read(self.READ_NUM_BYTES, timeout)
import typing
from types import TracebackType
-from .config import PoolLimits, SSLConfig, TimeoutConfig
+from .config import CertTypes, PoolLimits, TimeoutConfig, TimeoutTypes, VerifyTypes
from .models import (
URL,
Headers,
URLTypes,
)
-OptionalTimeout = typing.Optional[TimeoutConfig]
-
class Protocol(str, enum.Enum):
HTTP_11 = "HTTP/1.1"
params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None
) -> Response:
request = Request(method, url, data=data, params=params, headers=headers)
self.prepare_request(request)
- response = await self.send(request, stream=stream, ssl=ssl, timeout=timeout)
+ response = await self.send(
+ request, stream=stream, verify=verify, cert=cert, timeout=timeout
+ )
return response
def prepare_request(self, request: Request) -> None:
self,
request: Request,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
raise NotImplementedError() # pragma: nocover
backend, or for stand-alone test cases.
"""
- async def read(self, n: int, timeout: OptionalTimeout = None) -> bytes:
+ async def read(self, n: int, timeout: TimeoutConfig = None) -> bytes:
raise NotImplementedError() # pragma: no cover
def write_no_block(self, data: bytes) -> None:
raise NotImplementedError() # pragma: no cover
- async def write(self, data: bytes, timeout: OptionalTimeout = None) -> None:
+ async def write(self, data: bytes, timeout: TimeoutConfig = None) -> None:
raise NotImplementedError() # pragma: no cover
async def close(self) -> None:
from httpcore import (
URL,
+ CertTypes,
Client,
Dispatcher,
Request,
Response,
- SSLConfig,
- TimeoutConfig,
+ TimeoutTypes,
+ VerifyTypes,
)
self,
request: Request,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
body = json.dumps({"auth": request.headers.get("Authorization")}).encode()
return Response(200, content=body, request=request)
from httpcore import (
URL,
+ CertTypes,
Client,
Cookies,
Dispatcher,
Request,
Response,
- SSLConfig,
- TimeoutConfig,
+ TimeoutTypes,
+ VerifyTypes,
)
self,
request: Request,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
if request.url.path.startswith("/echo_cookies"):
body = json.dumps({"cookies": request.headers.get("Cookie")}).encode()
from httpcore import (
URL,
AsyncClient,
+ CertTypes,
Dispatcher,
RedirectBodyUnavailable,
RedirectLoop,
Request,
Response,
- SSLConfig,
- TimeoutConfig,
+ TimeoutTypes,
TooManyRedirects,
+ VerifyTypes,
codes,
)
self,
request: Request,
stream: bool = False,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
) -> Response:
if request.url.path == "/redirect_301":
status_code = codes.MOVED_PERMANENTLY
@pytest.mark.asyncio
async def test_get(server):
conn = HTTPConnection(origin="http://127.0.0.1:8000/")
- request = Request("GET", "http://127.0.0.1:8000/")
- request.prepare()
- response = await conn.send(request)
+ response = await conn.request("GET", "http://127.0.0.1:8000/")
assert response.status_code == 200
assert response.content == b"Hello, world!"
@pytest.mark.asyncio
-async def test_https_get(https_server):
- http = HTTPConnection(origin="https://127.0.0.1:8001/", ssl=SSLConfig(verify=False))
- response = await http.request("GET", "https://127.0.0.1:8001/")
+async def test_post(server):
+ conn = HTTPConnection(origin="http://127.0.0.1:8000/")
+ response = await conn.request("GET", "http://127.0.0.1:8000/", data=b"Hello, world!")
+ assert response.status_code == 200
+
+
+@pytest.mark.asyncio
+async def test_https_get_with_ssl_defaults(https_server):
+ """
+ An HTTPS request, with default SSL configuration set on the client.
+ """
+ conn = HTTPConnection(origin="https://127.0.0.1:8001/", verify=False)
+ response = await conn.request("GET", "https://127.0.0.1:8001/")
assert response.status_code == 200
assert response.content == b"Hello, world!"
@pytest.mark.asyncio
-async def test_post(server):
- conn = HTTPConnection(origin="http://127.0.0.1:8000/")
- request = Request("GET", "http://127.0.0.1:8000/", data=b"Hello, world!")
- request.prepare()
- response = await conn.send(request)
+async def test_https_get_with_sll_overrides(https_server):
+ """
+ An HTTPS request, with SSL configuration set on the request.
+ """
+ conn = HTTPConnection(origin="https://127.0.0.1:8001/")
+ response = await conn.request("GET", "https://127.0.0.1:8001/", verify=False)
assert response.status_code == 200
+ assert response.content == b"Hello, world!"
def test_limits_eq():
limits = httpcore.PoolLimits(hard_limit=100)
assert limits == httpcore.PoolLimits(hard_limit=100)
+
+
+def test_timeout_from_tuple():
+ timeout = httpcore.TimeoutConfig(timeout=(5.0, 5.0, 5.0))
+ assert timeout == httpcore.TimeoutConfig(timeout=5.0)
+
+
+def test_timeout_from_config_instance():
+ timeout = httpcore.TimeoutConfig(timeout=(5.0))
+ assert httpcore.TimeoutConfig(timeout) == httpcore.TimeoutConfig(timeout=5.0)