__title__ = "httpx"
__description__ = "A next generation HTTP client, for Python 3."
-__version__ = "0.20.0"
+__version__ = "0.21.0"
headers: HeaderTypes = None,
cookies: CookieTypes = None,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Request:
"""
Build and return a request instance.
headers = self._merge_headers(headers)
cookies = self._merge_cookies(cookies)
params = self._merge_queryparams(params)
- timeout = (
- self.timeout if isinstance(timeout, UseClientDefault) else Timeout(timeout)
- )
+ extensions = {} if extensions is None else extensions
+ if "timeout" not in extensions:
+ timeout = (
+ self.timeout
+ if isinstance(timeout, UseClientDefault)
+ else Timeout(timeout)
+ )
+ extensions["timeout"] = timeout.as_dict()
return Request(
method,
url,
params=params,
headers=headers,
cookies=cookies,
- extensions={"timeout": timeout.as_dict()},
+ extensions=extensions,
)
def _merge_url(self, url: URLTypes) -> URL:
stream = self._redirect_stream(request, method)
cookies = Cookies(self.cookies)
return Request(
- method=method, url=url, headers=headers, cookies=cookies, stream=stream
+ method=method,
+ url=url,
+ headers=headers,
+ cookies=cookies,
+ stream=stream,
+ extensions=request.extensions,
)
def _redirect_method(self, request: Request, response: Response) -> str:
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Build and send a request.
headers=headers,
cookies=cookies,
timeout=timeout,
+ extensions=extensions,
)
return self.send(request, auth=auth, follow_redirects=follow_redirects)
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> typing.Iterator[Response]:
"""
Alternative to `httpx.request()` that streams the response body
headers=headers,
cookies=cookies,
timeout=timeout,
+ extensions=extensions,
)
response = self.send(
request=request,
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `GET` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def options(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send an `OPTIONS` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def head(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `HEAD` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def post(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `POST` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def put(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `PUT` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def patch(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `PATCH` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def delete(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `DELETE` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
def close(self) -> None:
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Build and send a request.
headers=headers,
cookies=cookies,
timeout=timeout,
+ extensions=extensions,
)
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> typing.AsyncIterator[Response]:
"""
Alternative to `httpx.request()` that streams the response body
headers=headers,
cookies=cookies,
timeout=timeout,
+ extensions=extensions,
)
response = await self.send(
request=request,
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `GET` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def options(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send an `OPTIONS` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def head(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `HEAD` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def post(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `POST` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def put(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `PUT` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def patch(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `PATCH` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def delete(
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
+ extensions: dict = None,
) -> Response:
"""
Send a `DELETE` request.
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
+ extensions=extensions,
)
async def aclose(self) -> None:
import typing
import click
+import httpcore
import pygments.lexers
import pygments.util
import rich.console
from ._client import Client
from ._exceptions import RequestError
-from ._models import Request, Response
+from ._models import Response
+from ._status_codes import codes
def print_help() -> None:
return "" # pragma: nocover
-def format_request_headers(request: Request, http2: bool = False) -> str:
+def format_request_headers(request: httpcore.Request, http2: bool = False) -> str:
version = "HTTP/2" if http2 else "HTTP/1.1"
headers = [
- (name.lower() if http2 else name, value) for name, value in request.headers.raw
+ (name.lower() if http2 else name, value) for name, value in request.headers
]
- target = request.url.raw[-1].decode("ascii")
- lines = [f"{request.method} {target} {version}"] + [
+ method = request.method.decode("ascii")
+ target = request.url.target.decode("ascii")
+ lines = [f"{method} {target} {version}"] + [
f"{name.decode('ascii')}: {value.decode('ascii')}" for name, value in headers
]
return "\n".join(lines)
-def format_response_headers(response: Response) -> str:
- lines = [
- f"{response.http_version} {response.status_code} {response.reason_phrase}"
- ] + [
- f"{name.decode('ascii')}: {value.decode('ascii')}"
- for name, value in response.headers.raw
+def format_response_headers(
+ http_version: bytes,
+ status: int,
+ reason_phrase: typing.Optional[bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]],
+) -> str:
+ version = http_version.decode("ascii")
+ reason = (
+ codes.get_reason_phrase(status)
+ if reason_phrase is None
+ else reason_phrase.decode("ascii")
+ )
+ lines = [f"{version} {status} {reason}"] + [
+ f"{name.decode('ascii')}: {value.decode('ascii')}" for name, value in headers
]
return "\n".join(lines)
-def print_request_headers(request: Request, http2: bool = False) -> None:
+def print_request_headers(request: httpcore.Request, http2: bool = False) -> None:
console = rich.console.Console()
http_text = format_request_headers(request, http2=http2)
syntax = rich.syntax.Syntax(http_text, "http", theme="ansi_dark", word_wrap=True)
console.print(syntax)
-def print_response_headers(response: Response) -> None:
+def print_response_headers(
+ http_version: bytes,
+ status: int,
+ reason_phrase: typing.Optional[bytes],
+ headers: typing.List[typing.Tuple[bytes, bytes]],
+) -> None:
console = rich.console.Console()
- http_text = format_response_headers(response)
+ http_text = format_response_headers(http_version, status, reason_phrase, headers)
syntax = rich.syntax.Syntax(http_text, "http", theme="ansi_dark", word_wrap=True)
console.print(syntax)
-
-
-def print_delimiter() -> None:
- console = rich.console.Console()
syntax = rich.syntax.Syntax("", "http", theme="ansi_dark", word_wrap=True)
console.print(syntax)
-def print_redirects(response: Response) -> None:
- if response.has_redirect_location:
- response.read()
- print_response_headers(response)
- print_response(response)
-
-
def print_response(response: Response) -> None:
console = rich.console.Console()
lexer_name = get_lexer_for_response(response)
console.print(response.text)
+def format_certificate(cert: dict) -> str: # pragma: nocover
+ lines = []
+ for key, value in cert.items():
+ if isinstance(value, (list, tuple)):
+ lines.append(f"* {key}:")
+ for item in value:
+ if key in ("subject", "issuer"):
+ for sub_item in item:
+ lines.append(f"* {sub_item[0]}: {sub_item[1]!r}")
+ elif isinstance(item, tuple) and len(item) == 2:
+ lines.append(f"* {item[0]}: {item[1]!r}")
+ else:
+ lines.append(f"* {item!r}")
+ else:
+ lines.append(f"* {key}: {value!r}")
+ return "\n".join(lines)
+
+
+def trace(name: str, info: dict, verbose: bool = False) -> None:
+ console = rich.console.Console()
+ if name == "connection.connect_tcp.started" and verbose:
+ host = info["host"]
+ console.print(f"* Connecting to {host!r}")
+ elif name == "connection.connect_tcp.complete" and verbose:
+ stream = info["return_value"]
+ server_addr = stream.get_extra_info("server_addr")
+ console.print(f"* Connected to {server_addr[0]!r} on port {server_addr[1]}")
+ elif name == "connection.start_tls.complete" and verbose: # pragma: nocover
+ stream = info["return_value"]
+ ssl_object = stream.get_extra_info("ssl_object")
+ version = ssl_object.version()
+ cipher = ssl_object.cipher()
+ server_cert = ssl_object.getpeercert()
+ alpn = ssl_object.selected_alpn_protocol()
+ console.print(f"* SSL established using {version!r} / {cipher[0]!r}")
+ console.print(f"* Selected ALPN protocol: {alpn!r}")
+ if server_cert:
+ console.print("* Server certificate:")
+ console.print(format_certificate(server_cert))
+ elif name == "http11.send_request_headers.started" and verbose:
+ request = info["request"]
+ print_request_headers(request, http2=False)
+ elif name == "http2.send_request_headers.started" and verbose: # pragma: nocover
+ request = info["request"]
+ print_request_headers(request, http2=True)
+ elif name == "http11.receive_response_headers.complete":
+ http_version, status, reason_phrase, headers = info["return_value"]
+ print_response_headers(http_version, status, reason_phrase, headers)
+ elif name == "http2.receive_response_headers.complete": # pragma: nocover
+ status, headers = info["return_value"]
+ http_version = b"HTTP/2"
+ reason_phrase = None
+ print_response_headers(http_version, status, reason_phrase, headers)
+
+
def download_response(response: Response, download: typing.BinaryIO) -> None:
console = rich.console.Console()
syntax = rich.syntax.Syntax("", "http", theme="ansi_dark", word_wrap=True)
if not method:
method = "POST" if content or data or files or json else "GET"
- event_hooks: typing.Dict[str, typing.List[typing.Callable]] = {}
- if verbose:
- event_hooks["request"] = [functools.partial(print_request_headers, http2=http2)]
- if follow_redirects:
- event_hooks["response"] = [print_redirects]
-
try:
with Client(
proxies=proxies,
timeout=timeout,
verify=verify,
http2=http2,
- event_hooks=event_hooks,
) as client:
with client.stream(
method,
cookies=dict(cookies),
auth=auth,
follow_redirects=follow_redirects,
+ extensions={"trace": functools.partial(trace, verbose=verbose)},
) as response:
- print_response_headers(response)
-
if download is not None:
download_response(response, download)
else:
response.read()
if response.content:
- print_delimiter()
print_response(response)
except RequestError as exc:
console = rich.console.Console()
- console.print(f"{type(exc).__name__}: {exc}")
+ console.print(f"[red]{type(exc).__name__}[/red]: {str(exc)}")
sys.exit(1)
sys.exit(0 if response.is_success else 1)
* uds: str
* local_address: str
* retries: int
-* backend: str ("auto", "asyncio", "trio", "curio", "anyio", "sync")
Example usages...
from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
from .._exceptions import (
- CloseError,
ConnectError,
ConnectTimeout,
LocalProtocolError,
httpcore.ConnectError: ConnectError,
httpcore.ReadError: ReadError,
httpcore.WriteError: WriteError,
- httpcore.CloseError: CloseError,
httpcore.ProxyError: ProxyError,
httpcore.UnsupportedProtocol: UnsupportedProtocol,
httpcore.ProtocolError: ProtocolError,
class ResponseStream(SyncByteStream):
- def __init__(self, httpcore_stream: httpcore.SyncByteStream):
+ def __init__(self, httpcore_stream: typing.Iterable[bytes]):
self._httpcore_stream = httpcore_stream
def __iter__(self) -> typing.Iterator[bytes]:
yield part
def close(self) -> None:
- with map_httpcore_exceptions():
- self._httpcore_stream.close()
+ if hasattr(self._httpcore_stream, "close"):
+ self._httpcore_stream.close() # type: ignore
class HTTPTransport(BaseTransport):
uds: str = None,
local_address: str = None,
retries: int = 0,
- backend: str = "sync",
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
if proxy is None:
- self._pool = httpcore.SyncConnectionPool(
+ self._pool = httpcore.ConnectionPool(
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
uds=uds,
local_address=local_address,
retries=retries,
- backend=backend,
)
else:
- self._pool = httpcore.SyncHTTPProxy(
- proxy_url=proxy.url.raw,
+ self._pool = httpcore.HTTPProxy(
+ proxy_url=httpcore.URL(
+ scheme=proxy.url.raw_scheme,
+ host=proxy.url.raw_host,
+ port=proxy.url.port,
+ target=proxy.url.raw_path,
+ ),
proxy_headers=proxy.headers.raw,
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
keepalive_expiry=limits.keepalive_expiry,
- http2=http2,
- backend=backend,
)
def __enter__(self: T) -> T: # Use generics for subclass support.
) -> Response:
assert isinstance(request.stream, SyncByteStream)
+ req = httpcore.Request(
+ method=request.method,
+ url=httpcore.URL(
+ scheme=request.url.raw_scheme,
+ host=request.url.raw_host,
+ port=request.url.port,
+ target=request.url.raw_path,
+ ),
+ headers=request.headers.raw,
+ content=request.stream,
+ extensions=request.extensions,
+ )
with map_httpcore_exceptions():
- status_code, headers, byte_stream, extensions = self._pool.handle_request(
- method=request.method.encode("ascii"),
- url=request.url.raw,
- headers=request.headers.raw,
- stream=httpcore.IteratorByteStream(iter(request.stream)),
- extensions=request.extensions,
- )
+ resp = self._pool.handle_request(req)
- stream = ResponseStream(byte_stream)
+ assert isinstance(resp.stream, typing.Iterable)
return Response(
- status_code, headers=headers, stream=stream, extensions=extensions
+ status_code=resp.status,
+ headers=resp.headers,
+ stream=ResponseStream(resp.stream),
+ extensions=resp.extensions,
)
def close(self) -> None:
class AsyncResponseStream(AsyncByteStream):
- def __init__(self, httpcore_stream: httpcore.AsyncByteStream):
+ def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]):
self._httpcore_stream = httpcore_stream
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
yield part
async def aclose(self) -> None:
- with map_httpcore_exceptions():
- await self._httpcore_stream.aclose()
+ if hasattr(self._httpcore_stream, "aclose"):
+ await self._httpcore_stream.aclose() # type: ignore
class AsyncHTTPTransport(AsyncBaseTransport):
uds: str = None,
local_address: str = None,
retries: int = 0,
- backend: str = "auto",
) -> None:
ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
uds=uds,
local_address=local_address,
retries=retries,
- backend=backend,
)
else:
self._pool = httpcore.AsyncHTTPProxy(
- proxy_url=proxy.url.raw,
+ proxy_url=httpcore.URL(
+ scheme=proxy.url.raw_scheme,
+ host=proxy.url.raw_host,
+ port=proxy.url.port,
+ target=proxy.url.raw_path,
+ ),
proxy_headers=proxy.headers.raw,
ssl_context=ssl_context,
max_connections=limits.max_connections,
max_keepalive_connections=limits.max_keepalive_connections,
keepalive_expiry=limits.keepalive_expiry,
- http2=http2,
- backend=backend,
)
async def __aenter__(self: A) -> A: # Use generics for subclass support.
) -> Response:
assert isinstance(request.stream, AsyncByteStream)
+ req = httpcore.Request(
+ method=request.method,
+ url=httpcore.URL(
+ scheme=request.url.raw_scheme,
+ host=request.url.raw_host,
+ port=request.url.port,
+ target=request.url.raw_path,
+ ),
+ headers=request.headers.raw,
+ content=request.stream,
+ extensions=request.extensions,
+ )
with map_httpcore_exceptions():
- (
- status_code,
- headers,
- byte_stream,
- extensions,
- ) = await self._pool.handle_async_request(
- method=request.method.encode("ascii"),
- url=request.url.raw,
- headers=request.headers.raw,
- stream=httpcore.AsyncIteratorByteStream(request.stream.__aiter__()),
- extensions=request.extensions,
- )
+ resp = await self._pool.handle_async_request(req)
- stream = AsyncResponseStream(byte_stream)
+ assert isinstance(resp.stream, typing.AsyncIterable)
return Response(
- status_code, headers=headers, stream=stream, extensions=extensions
+ status_code=resp.status,
+ headers=resp.headers,
+ stream=AsyncResponseStream(resp.stream),
+ extensions=resp.extensions,
)
async def aclose(self) -> None:
"charset_normalizer",
"sniffio",
"rfc3986[idna2008]>=1.3,<2",
- "httpcore>=0.13.3,<0.14.0",
+ "httpcore>=0.14.0,<0.15.0",
"async_generator; python_version < '3.7'"
],
extras_require={
Given a URL string, return the origin in the raw tuple format that
`httpcore` uses for it's representation.
"""
- DEFAULT_PORTS = {b"http": 80, b"https": 443}
- scheme, host, explicit_port = httpx.URL(url).raw[:3]
- default_port = DEFAULT_PORTS[scheme]
- port = default_port if explicit_port is None else explicit_port
- return scheme, host, port
+ scheme, host, port = httpx.URL(url).raw[:3]
+ return httpcore.URL(scheme=scheme, host=host, port=port, target="/")
@pytest.mark.parametrize(
assert pattern in client._mounts
proxy = client._mounts[pattern]
assert isinstance(proxy, httpx.HTTPTransport)
- assert isinstance(proxy._pool, httpcore.SyncHTTPProxy)
- assert proxy._pool.proxy_origin == url_to_origin(url)
+ assert isinstance(proxy._pool, httpcore.HTTPProxy)
+ assert proxy._pool._proxy_url == url_to_origin(url)
assert len(expected_proxies) == len(client._mounts)
assert transport is client._transport
else:
assert isinstance(transport, httpx.HTTPTransport)
- assert isinstance(transport._pool, httpcore.SyncHTTPProxy)
- assert transport._pool.proxy_origin == url_to_origin(expected)
+ assert isinstance(transport._pool, httpcore.HTTPProxy)
+ assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.asyncio
if expected is None:
assert transport == client._transport
else:
- assert transport._pool.proxy_origin == url_to_origin(expected)
+ assert transport._pool._proxy_url == url_to_origin(expected)
@pytest.mark.parametrize(
if isinstance(value, type)
and issubclass(value, Exception)
and value not in HTTPCORE_EXC_MAP
+ and value is not httpcore.ConnectionNotAvailable
]
if not_mapped: # pragma: nocover
def close(self):
pass
- class CloseFailedStream:
- def __iter__(self):
- yield b""
-
- def close(self):
- raise httpcore.CloseError()
-
with mock.patch(
- "httpcore.SyncConnectionPool.handle_request", side_effect=connect_failed
+ "httpcore.ConnectionPool.handle_request", side_effect=connect_failed
):
with pytest.raises(httpx.ConnectError):
httpx.get(server.url)
with mock.patch(
- "httpcore.SyncConnectionPool.handle_request",
- return_value=(200, [], TimeoutStream(), {}),
+ "httpcore.ConnectionPool.handle_request",
+ return_value=httpcore.Response(
+ 200, headers=[], content=TimeoutStream(), extensions={}
+ ),
):
with pytest.raises(httpx.ReadTimeout):
httpx.get(server.url)
- with mock.patch(
- "httpcore.SyncConnectionPool.handle_request",
- return_value=(200, [], CloseFailedStream(), {}),
- ):
- with pytest.raises(httpx.CloseError):
- httpx.get(server.url)
-
def test_httpx_exceptions_exposed() -> None:
"""
"server: uvicorn",
"location: /",
"Transfer-Encoding: chunked",
+ "",
]
result = runner.invoke(httpx.main, [url, "-v"])
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
+ "* Connecting to '127.0.0.1'",
+ "* Connected to '127.0.0.1' on port 8000",
"GET / HTTP/1.1",
f"Host: {server.url.netloc.decode('ascii')}",
"Accept: */*",
print(result.output)
assert result.exit_code == 0
assert remove_date_header(splitlines(result.output)) == [
+ "* Connecting to '127.0.0.1'",
+ "* Connected to '127.0.0.1' on port 8000",
"GET / HTTP/1.1",
f"Host: {server.url.netloc.decode('ascii')}",
"Accept: */*",
timeout = httpx.Timeout(None, pool=1e-4)
async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
- async with client.stream("GET", server.url):
- with pytest.raises(httpx.PoolTimeout):
- await client.get("http://localhost:8000/")
+ with pytest.raises(httpx.PoolTimeout):
+ async with client.stream("GET", server.url):
+ await client.get(server.url)