proxy.close()
def __enter__(self) -> "Client":
+ self._transport.__enter__()
+ for proxy in self._proxies.values():
+ if proxy is not None:
+ proxy.__enter__()
return self
def __exit__(
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
- self.close()
+ self._transport.__exit__(exc_type, exc_value, traceback)
+ for proxy in self._proxies.values():
+ if proxy is not None:
+ proxy.__exit__(exc_type, exc_value, traceback)
class AsyncClient(BaseClient):
await proxy.aclose()
async def __aenter__(self) -> "AsyncClient":
+ await self._transport.__aenter__()
+ for proxy in self._proxies.values():
+ if proxy is not None:
+ await proxy.__aenter__()
return self
async def __aexit__(
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
- await self.aclose()
+ await self._transport.__aexit__(exc_type, exc_value, traceback)
+ for proxy in self._proxies.values():
+ if proxy is not None:
+ await proxy.__aexit__(exc_type, exc_value, traceback)
class StreamContextManager:
from datetime import timedelta
+import httpcore
import pytest
import httpx
assert response.status_code == 200
assert response.content == data
+
+
+@pytest.mark.usefixtures("async_environment")
+async def test_context_managed_transport():
+ class Transport(httpcore.AsyncHTTPTransport):
+ def __init__(self):
+ self.events = []
+
+ async def aclose(self):
+ # The base implementation of httpcore.AsyncHTTPTransport just
+ # calls into `.aclose`, so simple transport cases can just override
+ # this method for any cleanup, where more complex cases
+ # might want to additionally override `__aenter__`/`__aexit__`.
+ self.events.append("transport.aclose")
+
+ async def __aenter__(self):
+ await super().__aenter__()
+ self.events.append("transport.__aenter__")
+
+ async def __aexit__(self, *args):
+ await super().__aexit__(*args)
+ self.events.append("transport.__aexit__")
+
+ # Note that we're including 'proxies' here to *also* run through the
+ # proxy context management, although we can't easily test that at the
+ # moment, since we can't add proxies as transport instances.
+ #
+ # Once we have a more generalised Mount API we'll be able to remove this
+ # in favour of ensuring all mounts are context managed, which will
+ # also neccessarily include proxies.
+ transport = Transport()
+ async with httpx.AsyncClient(transport=transport, proxies="http://www.example.com"):
+ pass
+
+ assert transport.events == [
+ "transport.__aenter__",
+ "transport.aclose",
+ "transport.__aexit__",
+ ]
from datetime import timedelta
+import httpcore
import pytest
import httpx
with pytest.warns(DeprecationWarning):
httpx.AsyncClient(pool_limits=limits)
+
+
+def test_context_managed_transport():
+ class Transport(httpcore.SyncHTTPTransport):
+ def __init__(self):
+ self.events = []
+
+ def close(self):
+ # The base implementation of httpcore.SyncHTTPTransport just
+ # calls into `.close`, so simple transport cases can just override
+ # this method for any cleanup, where more complex cases
+ # might want to additionally override `__enter__`/`__exit__`.
+ self.events.append("transport.close")
+
+ def __enter__(self):
+ super().__enter__()
+ self.events.append("transport.__enter__")
+
+ def __exit__(self, *args):
+ super().__exit__(*args)
+ self.events.append("transport.__exit__")
+
+ # Note that we're including 'proxies' here to *also* run through the
+ # proxy context management, although we can't easily test that at the
+ # moment, since we can't add proxies as transport instances.
+ #
+ # Once we have a more generalised Mount API we'll be able to remove this
+ # in favour of ensuring all mounts are context managed, which will
+ # also neccessarily include proxies.
+ transport = Transport()
+ with httpx.Client(transport=transport, proxies="http://www.example.com"):
+ pass
+
+ assert transport.events == [
+ "transport.__enter__",
+ "transport.close",
+ "transport.__exit__",
+ ]