import asyncio
-import os
import ssl
import typing
+from pathlib import Path
import certifi
"""
if isinstance(self.verify, bool):
ca_bundle_path = DEFAULT_CA_BUNDLE_PATH
- elif os.path.exists(self.verify):
- ca_bundle_path = self.verify
+ elif Path(self.verify).exists():
+ ca_bundle_path = Path(self.verify)
else:
raise IOError(
"Could not find a suitable TLS CA certificate bundle, "
except AttributeError: # pragma: nocover
pass
- if os.path.isfile(ca_bundle_path):
- context.load_verify_locations(cafile=ca_bundle_path)
- elif os.path.isdir(ca_bundle_path):
- context.load_verify_locations(capath=ca_bundle_path)
+ if ca_bundle_path.is_file():
+ context.load_verify_locations(cafile=str(ca_bundle_path))
+ elif ca_bundle_path.is_dir():
+ context.load_verify_locations(capath=str(ca_bundle_path))
if self.cert is not None:
if isinstance(self.cert, str):
DEFAULT_SSL_CONFIG = SSLConfig(cert=None, verify=True)
DEFAULT_TIMEOUT_CONFIG = TimeoutConfig(timeout=5.0)
DEFAULT_POOL_LIMITS = PoolLimits(soft_limit=10, hard_limit=100, pool_timeout=5.0)
-DEFAULT_CA_BUNDLE_PATH = certifi.where()
+DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
DEFAULT_MAX_REDIRECTS = 20
import re
import typing
from io import BytesIO
+from pathlib import Path
_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"}
_HTML5_FORM_ENCODING_REPLACEMENTS.update(
) -> None:
self.name = name
if not isinstance(value, tuple):
- self.filename = os.path.basename(getattr(value, "name", "upload"))
+ self.filename = Path(getattr(value, "name", "upload")).name
self.file = value # type: typing.Union[typing.IO[str], typing.IO[bytes]]
self.content_type = self.guess_content_type()
else:
def _format_param(name: str, value: typing.Union[str, bytes]) -> bytes:
if isinstance(value, bytes):
value = value.decode()
-
+
def replacer(match: typing.Match[str]) -> str:
return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)]
import netrc
import os
import typing
+from pathlib import Path
def normalize_header_key(value: typing.AnyStr, encoding: str = None) -> bytes:
return None
-NETRC_STATIC_FILES = tuple(
- os.path.expanduser(path) for path in ("~/.netrc", "~/_netrc")
-)
+NETRC_STATIC_FILES = (Path("~/.netrc"), Path("~/_netrc"))
def get_netrc_login(host: str) -> typing.Optional[typing.Tuple[str, str, str]]:
- NETRC_FILES = (
- (os.environ["NETRC"],) if "NETRC" in os.environ else NETRC_STATIC_FILES
- )
+ NETRC_FILES = (Path(os.getenv("NETRC", "")),) + NETRC_STATIC_FILES
netrc_path = None
for file_path in NETRC_FILES:
- if os.path.isfile(file_path):
- netrc_path = file_path
+ expanded_path = file_path.expanduser()
+ if expanded_path.is_file():
+ netrc_path = expanded_path
break
if netrc_path is None:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import os
import re
+from pathlib import Path
from setuptools import setup
"""
Return package version as listed in `__version__` in `init.py`.
"""
- with open(os.path.join(package, "__version__.py")) as f:
- return re.search("__version__ = ['\"]([^'\"]+)['\"]", f.read()).group(1)
+ version = Path(package, "__version__.py").read_text()
+ return re.search("__version__ = ['\"]([^'\"]+)['\"]", version).group(1)
def get_long_description():
"""
Return root package and all sub-packages.
"""
- return [
- dirpath
- for dirpath, dirnames, filenames in os.walk(package)
- if os.path.exists(os.path.join(dirpath, "__init__.py"))
- ]
+ return [str(path.parent) for path in Path(package).glob("**/__init__.py")]
setup(
-import os
import ssl
import pytest
@pytest.mark.asyncio
async def test_load_ssl_config_verify_directory():
- path = os.path.dirname(httpx.config.DEFAULT_CA_BUNDLE_PATH)
+ path = httpx.config.DEFAULT_CA_BUNDLE_PATH.parent
ssl_config = httpx.SSLConfig(verify=path)
context = await ssl_config.load_ssl_context()
assert context.verify_mode == ssl.VerifyMode.CERT_REQUIRED