HTTPX supports either `asyncio` or `trio` as an async environment.
-By default it will auto-detect which of those two to use as the backend
+It will auto-detect which of those two to use as the backend
for socket operations and concurrency primitives.
-You can also explicitly select a backend by instantiating a client with the
-`backend` argument...
-
-```python
-client = httpx.AsyncClient(backend='auto') # Autodetection. The default case.
-client = httpx.AsyncClient(backend='asyncio') # Use asyncio as the backend.
-client = httpx.AsyncClient(backend='trio') # Use trio as the backend.
-```
-
### [AsyncIO](https://docs.python.org/3/library/asyncio.html)
AsyncIO is Python's [built-in library](https://docs.python.org/3/library/asyncio.html)
import hstspreload
from ._auth import Auth, AuthTypes, BasicAuth, FunctionAuth
-from ._backends.base import ConcurrencyBackend
from ._config import (
DEFAULT_MAX_REDIRECTS,
DEFAULT_POOL_LIMITS,
over the network.
* **app** - *(optional)* An ASGI application to send requests to,
rather than sending actual network requests.
- * **backend** - *(optional)* A concurrency backend to use when issuing
- async requests. Either 'auto', 'asyncio', 'trio', or a `ConcurrencyBackend`
- instance. Defaults to 'auto', for autodetection.
* **trust_env** - *(optional)* Enables or disables usage of environment
variables for configuration.
* **uds** - *(optional)* A path to a Unix domain socket to connect through.
base_url: URLTypes = None,
dispatch: AsyncDispatcher = None,
app: typing.Callable = None,
- backend: typing.Union[str, ConcurrencyBackend] = "auto",
trust_env: bool = True,
uds: str = None,
):
pool_limits=pool_limits,
dispatch=dispatch,
app=app,
- backend=backend,
trust_env=trust_env,
uds=uds,
)
cert=cert,
http2=http2,
pool_limits=pool_limits,
- backend=backend,
trust_env=trust_env,
)
for key, proxy in proxy_map.items()
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
dispatch: AsyncDispatcher = None,
app: typing.Callable = None,
- backend: typing.Union[str, ConcurrencyBackend] = "auto",
trust_env: bool = True,
uds: str = None,
) -> AsyncDispatcher:
cert=cert,
http2=http2,
pool_limits=pool_limits,
- backend=backend,
trust_env=trust_env,
uds=uds,
)
cert: CertTypes = None,
http2: bool = False,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
- backend: typing.Union[str, ConcurrencyBackend] = "auto",
trust_env: bool = True,
) -> AsyncDispatcher:
return HTTPProxy(
cert=cert,
http2=http2,
pool_limits=pool_limits,
- backend=backend,
trust_env=trust_env,
)
assert response.status_code == 200
assert response.text == "Hello, world!"
assert response.encoding == "iso-8859-1"
-
-
-async def test_explicit_backend(server, async_environment):
- async with httpx.AsyncClient(backend=async_environment) as client:
- response = await client.get(server.url)
- assert response.status_code == 200
- assert response.text == "Hello, world!"
from h2.settings import SettingCodes
from httpx import AsyncClient, Response, TimeoutException
+from httpx._dispatch.connection_pool import ConnectionPool
from .utils import MockHTTP2Backend
@pytest.mark.asyncio
async def test_http2_get_request():
backend = MockHTTP2Backend(app=app)
+ dispatch = ConnectionPool(backend=backend, http2=True)
- async with AsyncClient(backend=backend, http2=True) as client:
+ async with AsyncClient(dispatch=dispatch) as client:
response = await client.get("http://example.org")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_http2_post_request():
backend = MockHTTP2Backend(app=app)
+ dispatch = ConnectionPool(backend=backend, http2=True)
- async with AsyncClient(backend=backend, http2=True) as client:
+ async with AsyncClient(dispatch=dispatch) as client:
response = await client.post("http://example.org", data=b"<data>")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_http2_large_post_request():
backend = MockHTTP2Backend(app=app)
+ dispatch = ConnectionPool(backend=backend, http2=True)
data = b"a" * 100000
- async with AsyncClient(backend=backend, http2=True) as client:
+ async with AsyncClient(dispatch=dispatch) as client:
response = await client.post("http://example.org", data=data)
assert response.status_code == 200
assert json.loads(response.content) == {
@pytest.mark.asyncio
async def test_http2_multiple_requests():
backend = MockHTTP2Backend(app=app)
+ dispatch = ConnectionPool(backend=backend, http2=True)
- async with AsyncClient(backend=backend, http2=True) as client:
+ async with AsyncClient(dispatch=dispatch) as client:
response_1 = await client.get("http://example.org/1")
response_2 = await client.get("http://example.org/2")
response_3 = await client.get("http://example.org/3")
be seemlessly reconnected.
"""
backend = MockHTTP2Backend(app=app)
+ dispatch = ConnectionPool(backend=backend, http2=True)
- async with AsyncClient(backend=backend, http2=True) as client:
+ async with AsyncClient(dispatch=dispatch) as client:
response_1 = await client.get("http://example.org/1")
backend.server.close_connection = True
response_2 = await client.get("http://example.org/2")
@pytest.mark.asyncio
async def test_http2_settings_in_handshake():
backend = MockHTTP2Backend(app=app)
+ dispatch = ConnectionPool(backend=backend, http2=True)
- async with AsyncClient(backend=backend, http2=True) as client:
+ async with AsyncClient(dispatch=dispatch) as client:
await client.get("http://example.org")
h2_conn = backend.server.conn
class MockHTTP2Backend:
- def __init__(self, app, backend="auto"):
+ def __init__(self, app):
self.app = app
- self.backend = lookup_backend(backend)
+ self.backend = lookup_backend()
self.server = None
async def open_tcp_stream(
class MockHTTP2Server(BaseSocketStream):
- def __init__(self, app, backend):
+ def __init__(self, app, backend: MockHTTP2Backend):
config = h2.config.H2Configuration(client_side=False)
self.conn = h2.connection.H2Connection(config=config)
self.app = app
class MockRawSocketBackend:
- def __init__(self, data_to_send=b"", backend="auto"):
- self.backend = lookup_backend(backend)
+ def __init__(self, data_to_send=b""):
+ self.backend = lookup_backend()
self.data_to_send = data_to_send
self.received_data = []
self.stream = MockRawSocketStream(self)