cert: CertTypes = None,
verify: VerifyTypes = True,
stream: bool = False,
+ trust_env: bool = True,
) -> Response:
with Client() as client:
return client.request(
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"GET",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"OPTIONS",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"HEAD",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"POST",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"PUT",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"PATCH",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
cert: CertTypes = None,
verify: VerifyTypes = True,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return request(
"DELETE",
cert=cert,
verify=verify,
timeout=timeout,
+ trust_env=trust_env,
)
URLTypes,
)
from .status_codes import codes
+from .utils import get_netrc_login
class BaseClient:
app: typing.Callable = None,
raise_app_exceptions: bool = True,
backend: ConcurrencyBackend = None,
+ trust_env: bool = True,
):
if backend is None:
backend = AsyncioBackend()
self.max_redirects = max_redirects
self.dispatch = async_dispatch
self.concurrency_backend = backend
+ self.trust_env = trust_env
def merge_cookies(
self, cookies: CookieTypes = None
verify: VerifyTypes = None,
cert: CertTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
if auth is None:
auth = self.auth
if url.scheme not in ("http", "https"):
raise InvalidURL('URL scheme must be "http" or "https".')
- if auth is None and (url.username or url.password):
- auth = HTTPBasicAuth(username=url.username, password=url.password)
+ if auth is None:
+ if url.username or url.password:
+ auth = HTTPBasicAuth(username=url.username, password=url.password)
+ elif trust_env:
+ netrc_login = get_netrc_login(url.authority)
+ if netrc_login:
+ netrc_username, _, netrc_password = netrc_login
+ auth = HTTPBasicAuth(
+ username=netrc_username, password=netrc_password
+ )
if auth is not None:
if isinstance(auth, tuple):
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"GET",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def options(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"OPTIONS",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def head(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"HEAD",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def post(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"POST",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def put(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"PUT",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def patch(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"PATCH",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def delete(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
return await self.request(
"DELETE",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
async def request(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> AsyncResponse:
url = self.base_url.join(url)
headers = self.merge_headers(headers)
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
return response
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
url = self.base_url.join(url)
headers = self.merge_headers(headers)
"verify": verify,
"cert": cert,
"timeout": timeout,
+ "trust_env": trust_env,
}
async_response = concurrency_backend.run(coroutine, *args, **kwargs)
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"GET",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def options(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"OPTIONS",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def head(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"HEAD",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def post(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"POST",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def put(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"PUT",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def patch(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"PATCH",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def delete(
cert: CertTypes = None,
verify: VerifyTypes = None,
timeout: TimeoutTypes = None,
+ trust_env: bool = True,
) -> Response:
return self.request(
"DELETE",
verify=verify,
cert=cert,
timeout=timeout,
+ trust_env=trust_env,
)
def close(self) -> None: