history: typing.List[Response],
) -> Response:
auth_flow = auth.sync_auth_flow(request)
- request = next(auth_flow)
+ try:
+ request = next(auth_flow)
- for hook in self._event_hooks["request"]:
- hook(request)
+ for hook in self._event_hooks["request"]:
+ hook(request)
- while True:
- response = self._send_handling_redirects(
- request,
- timeout=timeout,
- allow_redirects=allow_redirects,
- history=history,
- )
- try:
+ while True:
+ response = self._send_handling_redirects(
+ request,
+ timeout=timeout,
+ allow_redirects=allow_redirects,
+ history=history,
+ )
try:
- next_request = auth_flow.send(response)
- except StopIteration:
- return response
+ try:
+ next_request = auth_flow.send(response)
+ except StopIteration:
+ return response
- response.history = list(history)
- response.read()
- request = next_request
- history.append(response)
+ response.history = list(history)
+ response.read()
+ request = next_request
+ history.append(response)
- except Exception as exc:
- response.close()
- raise exc
+ except Exception as exc:
+ response.close()
+ raise exc
+ finally:
+ auth_flow.close()
def _send_handling_redirects(
self,
transport.__exit__(exc_type, exc_value, traceback)
def __del__(self) -> None:
- if self._state == ClientState.OPENED:
+ # We use 'getattr' here, to manage the case where '__del__()' is called
+ # on a partically initiallized instance that raised an exception during
+ # the call to '__init__()'.
+ if getattr(self, "_state", None) == ClientState.OPENED: # noqa: B009
self.close()
history: typing.List[Response],
) -> Response:
auth_flow = auth.async_auth_flow(request)
- request = await auth_flow.__anext__()
+ try:
+ request = await auth_flow.__anext__()
- for hook in self._event_hooks["request"]:
- await hook(request)
+ for hook in self._event_hooks["request"]:
+ await hook(request)
- while True:
- response = await self._send_handling_redirects(
- request,
- timeout=timeout,
- allow_redirects=allow_redirects,
- history=history,
- )
- try:
+ while True:
+ response = await self._send_handling_redirects(
+ request,
+ timeout=timeout,
+ allow_redirects=allow_redirects,
+ history=history,
+ )
try:
- next_request = await auth_flow.asend(response)
- except StopAsyncIteration:
- return response
+ try:
+ next_request = await auth_flow.asend(response)
+ except StopAsyncIteration:
+ return response
- response.history = list(history)
- await response.aread()
- request = next_request
- history.append(response)
+ response.history = list(history)
+ await response.aread()
+ request = next_request
+ history.append(response)
- except Exception as exc:
- await response.aclose()
- raise exc
+ except Exception as exc:
+ await response.aclose()
+ raise exc
+ finally:
+ await auth_flow.aclose()
async def _send_handling_redirects(
self,
await proxy.__aexit__(exc_type, exc_value, traceback)
def __del__(self) -> None:
- if self._state == ClientState.OPENED:
+ # We use 'getattr' here, to manage the case where '__del__()' is called
+ # on a partically initiallized instance that raised an exception during
+ # the call to '__init__()'.
+ if getattr(self, "_state", None) == ClientState.OPENED: # noqa: B009
# Unlike the sync case, we cannot silently close the client when
# it is garbage collected, because `.aclose()` is an async operation,
# but `__del__` is not.
+"""
+The _compat module is used for code which requires branching between different
+Python environments. It is excluded from the code coverage checks.
+"""
+import ssl
+import sys
+
# `contextlib.asynccontextmanager` exists from Python 3.7 onwards.
# For 3.6 we require the `async_generator` package for a backported version.
try:
from contextlib import asynccontextmanager # type: ignore
-except ImportError: # pragma: no cover
+except ImportError:
from async_generator import asynccontextmanager # type: ignore # noqa
+
+
+def set_minimum_tls_version_1_2(context: ssl.SSLContext) -> None:
+ if sys.version_info >= (3, 10):
+ context.minimum_version = ssl.TLSVersion.TLSv1_2
+ else:
+ # These become deprecated in favor of 'context.minimum_version'
+ # from Python 3.10 onwards.
+ context.options |= ssl.OP_NO_SSLv2
+ context.options |= ssl.OP_NO_SSLv3
+ context.options |= ssl.OP_NO_TLSv1
+ context.options |= ssl.OP_NO_TLSv1_1
import certifi
+from ._compat import set_minimum_tls_version_1_2
from ._models import URL, Headers
from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes
from ._utils import get_ca_bundle_from_env, get_logger
Return an SSL context for unverified connections.
"""
context = self._create_default_ssl_context()
- context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
self._load_client_certs(context)
return context
Creates the default SSLContext object that's used for both verified
and unverified connections.
"""
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- context.options |= ssl.OP_NO_SSLv2
- context.options |= ssl.OP_NO_SSLv3
- context.options |= ssl.OP_NO_TLSv1
- context.options |= ssl.OP_NO_TLSv1_1
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ set_minimum_tls_version_1_2(context)
context.options |= ssl.OP_NO_COMPRESSION
context.set_ciphers(DEFAULT_CIPHERS)
[tool:pytest]
addopts = -rxXs
+filterwarnings =
+ error
+ default:::uvicorn
markers =
copied_from(source, changes=None): mark test as copied from somewhere else, along with a description of changes made to accodomate e.g. our test setup
[coverage:run]
-omit = venv/*
+omit = venv/*, httpx/_compat.py
include = httpx/*, tests/*
"e": True,
"f": "",
}
- files = {"file": ("name.txt", open(path, "rb"))}
-
- with mock.patch("os.urandom", return_value=os.urandom(16)):
- boundary = os.urandom(16).hex()
-
- headers, stream = encode_request(data=data, files=files)
- assert isinstance(stream, typing.Iterable)
-
- content = (
- '--{0}\r\nContent-Disposition: form-data; name="a"\r\n\r\n1\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="b"\r\n\r\nC\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="c"\r\n\r\n11\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="c"\r\n\r\n22\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="c"\r\n\r\n33\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="d"\r\n\r\n\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="e"\r\n\r\ntrue\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="f"\r\n\r\n\r\n'
- '--{0}\r\nContent-Disposition: form-data; name="file";'
- ' filename="name.txt"\r\n'
- "Content-Type: text/plain\r\n\r\n<file content>\r\n"
- "--{0}--\r\n"
- "".format(boundary).encode("ascii")
- )
- assert headers == {
- "Content-Type": f"multipart/form-data; boundary={boundary}",
- "Content-Length": str(len(content)),
- }
- assert content == b"".join(stream)
+ with open(path, "rb") as input_file:
+ files = {"file": ("name.txt", input_file)}
+
+ with mock.patch("os.urandom", return_value=os.urandom(16)):
+ boundary = os.urandom(16).hex()
+
+ headers, stream = encode_request(data=data, files=files)
+ assert isinstance(stream, typing.Iterable)
+
+ content = (
+ '--{0}\r\nContent-Disposition: form-data; name="a"\r\n\r\n1\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="b"\r\n\r\nC\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="c"\r\n\r\n11\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="c"\r\n\r\n22\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="c"\r\n\r\n33\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="d"\r\n\r\n\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="e"\r\n\r\ntrue\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="f"\r\n\r\n\r\n'
+ '--{0}\r\nContent-Disposition: form-data; name="file";'
+ ' filename="name.txt"\r\n'
+ "Content-Type: text/plain\r\n\r\n<file content>\r\n"
+ "--{0}--\r\n"
+ "".format(boundary).encode("ascii")
+ )
+ assert headers == {
+ "Content-Type": f"multipart/form-data; boundary={boundary}",
+ "Content-Length": str(len(content)),
+ }
+ assert content == b"".join(stream)
def test_multipart_encode_unicode_file_contents() -> None: