* A requests-compatible API.
* HTTP/2 and HTTP/1.1 support.
-* Support for issuing HTTP requests in parallel.
-* Standard synchronous interface, but with `async`/`await` support if you need it.
+* Support for [issuing HTTP requests in parallel](https://www.encode.io/http3/parallel/). *(Coming soon)*
+* Standard synchronous interface, but [with `async`/`await` support if you need it](https://www.encode.io/http3/async/).
+* Ability to [make requests directly to WSGI or ASGI applications](https://www.encode.io/http3/advanced/#calling-into-python-web-apps).
* Strict timeouts everywhere.
* Fully type annotated.
* 100% test coverage.
* Keep-Alive & Connection Pooling
* Sessions with Cookie Persistence
* Browser-style SSL Verification
-* Basic/Digest Authentication *Digest is still TODO*
+* Basic/Digest Authentication *(Digest is still TODO)*
* Elegant Key/Value Cookies
* Automatic Decompression
* Automatic Content Decoding
* Unicode Response Bodies
* Multipart File Uploads
-* HTTP(S) Proxy Support *TODO*
+* HTTP(S) Proxy Support *(TODO)*
* Connection Timeouts
* Streaming Downloads
-* .netrc Support *TODO*
+* .netrc Support *(TODO)*
* Chunked Requests
## Installation
--- /dev/null
+# Advanced Usage
+
+## Client Instances
+
+Using a Client instance to make requests will give you HTTP connection pooling,
+will provide cookie persistence, and allows you to apply configuration across
+all outgoing requests.
+
+```python
+>>> client = http3.Client()
+>>> r = client.get('https://example.org/')
+>>> r
+<Response [200 OK]>
+```
+
+## Calling into Python Web Apps
+
+You can configure an `http3` client to call directly into a Python web
+application, using either the WSGI or ASGI protocol.
+
+This is particularly useful for two main use-cases:
+
+* Using `http3` as a client, inside test cases.
+* Mocking out external services, during tests or in dev/staging environments.
+
+Here's an example of integrating against a Flask application:
+
+```python
+from flask import Flask
+import http3
+
+
+app = Flask(__name__)
+
+@app.route("/")
+def hello():
+ return "Hello World!"
+
+client = http3.Client(app=app)
+r = client.get('http://example/')
+assert r.status_code == 200
+assert r.text == "Hello World!"
+```
>>> response = client.get('https://example.org')
```
-* `def __init__([auth], [cookies], [verify], [cert], [timeout], [pool_limits], [max_redirects], [dispatch])`
+* `def __init__([auth], [cookies], [verify], [cert], [timeout], [pool_limits], [max_redirects], [app], [dispatch])`
* `def .get(url, [params], [headers], [cookies], [auth], [stream], [allow_redirects], [verify], [cert], [timeout])`
* `def .options(url, [params], [headers], [cookies], [auth], [stream], [allow_redirects], [verify], [cert], [timeout])`
* `def .head(url, [params], [headers], [cookies], [auth], [stream], [allow_redirects], [verify], [cert], [timeout])`
and can provide significant performance benefits and enable the use of
long-lived network connections such as WebSockets.
-If you're working with an async web framework such as Sanic, Starlette, FastAPI,
-Responder or Bocadillo, then you'll also want to use an async client for sending
-outgoing HTTP requests.
+If you're working with an async web framework then you'll also want to use an
+async client for sending outgoing HTTP requests.
## Making Async requests
* A requests-compatible API.
* HTTP/2 and HTTP/1.1 support.
-* Support for issuing HTTP requests in parallel.
-* Standard synchronous interface, but with `async`/`await` support if you need it.
+* Support for [issuing HTTP requests in parallel](parallel.md). *(Coming soon)*
+* Standard synchronous interface, but [with `async`/`await` support if you need it](async.md).
+* Ability to [make requests directly to WSGI or ASGI applications](advanced.md#calling-into-python-web-apps).
* Strict timeouts everywhere.
* Fully type annotated.
* 100% test coverage.
* Keep-Alive & Connection Pooling
* Sessions with Cookie Persistence
* Browser-style SSL Verification
-* Basic/Digest Authentication *Digest is still TODO*
+* Basic/Digest Authentication *(Digest is still TODO)*
* Elegant Key/Value Cookies
* Automatic Decompression
* Automatic Content Decoding
* Unicode Response Bodies
* Multipart File Uploads
-* HTTP(S) Proxy Support *TODO*
+* HTTP(S) Proxy Support *(TODO)*
* Connection Timeouts
* Streaming Downloads
-* .netrc Support *TODO*
+* .netrc Support *(TODO)*
* Chunked Requests
## Documentation
For a run-through of all the basics, head over to the [QuickStart](quickstart.md).
-For more advanced topics, see the [Parallel Requests](parallel.md) or [Async Client](async.md) documentation.
+For more advanced topics, see the [Advanced Usage](advanced.md) section, or
+the specific topics on making [Parallel Requests](parallel.md) or using the
+[Async Client](async.md).
The [Developer Interface](api.md) provides a comprehensive API reference.
+import inspect
import typing
from types import TracebackType
TimeoutTypes,
VerifyTypes,
)
+from .dispatch.asgi import ASGIDispatch
from .dispatch.connection_pool import ConnectionPool
from .dispatch.threaded import ThreadedDispatcher
+from .dispatch.wsgi import WSGIDispatch
from .exceptions import RedirectBodyUnavailable, RedirectLoop, TooManyRedirects
from .interfaces import AsyncDispatcher, ConcurrencyBackend, Dispatcher
from .models import (
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
dispatch: typing.Union[AsyncDispatcher, Dispatcher] = None,
+ app: typing.Callable = None,
backend: ConcurrencyBackend = None,
):
if backend is None:
backend = AsyncioBackend()
+ if app is not None:
+ param_count = len(inspect.signature(app).parameters)
+ assert param_count in (2, 3)
+ if param_count == 2:
+ dispatch = WSGIDispatch(app=app)
+ else:
+ dispatch = ASGIDispatch(app=app)
+
if dispatch is None:
async_dispatch = ConnectionPool(
verify=verify,
--- /dev/null
+import asyncio
+import typing
+
+from ..config import CertTypes, TimeoutTypes, VerifyTypes
+from ..interfaces import AsyncDispatcher
+from ..models import AsyncRequest, AsyncResponse
+
+
+class ASGIDispatch(AsyncDispatcher):
+ """
+ A custom dispatcher that handles sending requests directly to an ASGI app.
+
+ The simplest way to use this functionality is to use the `app`argument.
+ This will automatically infer if 'app' is a WSGI or an ASGI application,
+ and will setup an appropriate dispatch class:
+
+ ```
+ client = http3.Client(app=app)
+ ```
+
+ Alternatively, you can setup the dispatch instance explicitly.
+ This allows you to include any additional configuration arguments specific
+ to the ASGIDispatch class:
+
+ ```
+ dispatch = http3.ASGIDispatch(
+ app=app,
+ root_path="/submount",
+ client=("1.2.3.4", 123)
+ )
+ client = http3.Client(dispatch=dispatch)
+ ```
+ """
+
+ def __init__(
+ self,
+ app: typing.Callable,
+ root_path: str = "",
+ client: typing.Tuple[str, int] = ("127.0.0.1", 123),
+ ) -> None:
+ self.app = app
+ self.root_path = root_path
+ self.client = client
+
+ async def send(
+ self,
+ request: AsyncRequest,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
+ ) -> AsyncResponse:
+
+ scope = {
+ "type": "http",
+ "asgi": {"version": "3.0"},
+ "method": request.method,
+ "headers": request.headers.raw,
+ "scheme": request.url.scheme,
+ "path": request.url.path,
+ "query": request.url.query.encode("ascii"),
+ "server": request.url.host,
+ "client": self.client,
+ "root_path": self.root_path,
+ }
+ app = self.app
+ app_exc = None
+ status_code = None
+ headers = None
+ response_started = asyncio.Event()
+ response_body = BodyIterator()
+ request_stream = request.stream()
+
+ async def receive() -> dict:
+ nonlocal request_stream
+
+ try:
+ body = await request_stream.__anext__()
+ except StopAsyncIteration:
+ return {"type": "http.request", "body": b"", "more_body": False}
+ return {"type": "http.request", "body": body, "more_body": True}
+
+ async def send(message: dict) -> None:
+ nonlocal status_code, headers, response_started, response_body
+
+ if message["type"] == "http.response.start":
+ status_code = message["status"]
+ headers = message.get("headers", [])
+ response_started.set()
+ elif message["type"] == "http.response.body":
+ body = message.get("body", b"")
+ more_body = message.get("more_body", False)
+ if body:
+ await response_body.put(body)
+ if not more_body:
+ await response_body.done()
+
+ async def run_app() -> None:
+ nonlocal app, scope, receive, send, app_exc, response_body
+
+ try:
+ await app(scope, receive, send)
+ except Exception as exc:
+ app_exc = exc
+ finally:
+ await response_body.done()
+
+ # Really we'd like to push all `asyncio` logic into concurrency.py,
+ # with a standardized interface, so that we can support other event
+ # loop implementations, such as Trio and Curio.
+ # That's a bit fiddly here, so we're not yet supporting using a custom
+ # `ConcurrencyBackend` with the `Client(app=asgi_app)` case.
+ loop = asyncio.get_event_loop()
+ app_task = loop.create_task(run_app())
+ response_task = loop.create_task(response_started.wait())
+
+ tasks = {app_task, response_task} # type: typing.Set[asyncio.Task]
+
+ await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+
+ if app_exc is not None:
+ raise app_exc
+
+ assert response_started.is_set, "application did not return a response."
+ assert status_code is not None
+ assert headers is not None
+
+ async def on_close() -> None:
+ nonlocal app_task
+ await app_task
+ if app_exc is not None:
+ raise app_exc
+
+ return AsyncResponse(
+ status_code=status_code,
+ protocol="HTTP/1.1",
+ headers=headers,
+ content=response_body.iterate(),
+ on_close=on_close,
+ request=request,
+ )
+
+
+class BodyIterator:
+ """
+ Provides a byte-iterator interface that the client can use to
+ ingest the response content from.
+ """
+
+ def __init__(self) -> None:
+ self._queue = asyncio.Queue(
+ maxsize=1
+ ) # type: asyncio.Queue[typing.Union[bytes, object]]
+ self._done = object()
+
+ async def iterate(self) -> typing.AsyncIterator[bytes]:
+ """
+ A byte-iterator, used by the client to consume the response body.
+ """
+ while True:
+ data = await self._queue.get()
+ if data is self._done:
+ break
+ assert isinstance(data, bytes)
+ yield data
+
+ async def put(self, data: bytes) -> None:
+ """
+ Used by the server to add data to the response body.
+ """
+ await self._queue.put(data)
+
+ async def done(self) -> None:
+ """
+ Used by the server to signal the end of the response body.
+ """
+ await self._queue.put(self._done)
--- /dev/null
+import io
+import typing
+
+from ..config import CertTypes, TimeoutTypes, VerifyTypes
+from ..interfaces import Dispatcher
+from ..models import Request, Response
+
+
+class WSGIDispatch(Dispatcher):
+ """
+ A custom dispatcher that handles sending requests directly to an ASGI app.
+
+ The simplest way to use this functionality is to use the `app`argument.
+ This will automatically infer if 'app' is a WSGI or an ASGI application,
+ and will setup an appropriate dispatch class:
+
+ ```
+ client = http3.Client(app=app)
+ ```
+
+ Alternatively, you can setup the dispatch instance explicitly.
+ This allows you to include any additional configuration arguments specific
+ to the WSGIDispatch class:
+
+ ```
+ dispatch = http3.WSGIDispatch(
+ app=app,
+ script_name="/submount",
+ remote_addr="1.2.3.4"
+ )
+ client = http3.Client(dispatch=dispatch)
+ ```
+ """
+
+ def __init__(
+ self,
+ app: typing.Callable,
+ script_name: str = "",
+ remote_addr: str = "127.0.0.1",
+ ) -> None:
+ self.app = app
+ self.script_name = script_name
+ self.remote_addr = remote_addr
+
+ def send(
+ self,
+ request: Request,
+ verify: VerifyTypes = None,
+ cert: CertTypes = None,
+ timeout: TimeoutTypes = None,
+ ) -> Response:
+ environ = {
+ "wsgi.version": (1, 0),
+ "wsgi.url_scheme": request.url.scheme,
+ "wsgi.input": BodyStream(request.stream()),
+ "wsgi.errors": io.BytesIO(),
+ "wsgi.multithread": True,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False,
+ "REQUEST_METHOD": request.method,
+ "SCRIPT_NAME": self.script_name,
+ "PATH_INFO": request.url.path,
+ "QUERY_STRING": request.url.query,
+ "SERVER_NAME": request.url.host,
+ "SERVER_PORT": str(request.url.port),
+ "REMOTE_ADDR": self.remote_addr,
+ }
+ for key, value in request.headers.items():
+ key = key.upper().replace("-", "_")
+ if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
+ key = "HTTP_" + key
+ environ[key] = value
+
+ seen_status = None
+ seen_response_headers = None
+ seen_exc_info = None
+
+ def start_response(
+ status: str, response_headers: list, exc_info: typing.Any = None
+ ) -> None:
+ nonlocal seen_status, seen_response_headers, seen_exc_info
+ seen_status = status
+ seen_response_headers = response_headers
+ seen_exc_info = exc_info
+
+ result = self.app(environ, start_response)
+
+ assert seen_status is not None
+ assert seen_response_headers is not None
+ if seen_exc_info:
+ raise seen_exc_info[1]
+
+ return Response(
+ status_code=int(seen_status.split()[0]),
+ protocol="HTTP/1.1",
+ headers=seen_response_headers,
+ content=(chunk for chunk in result),
+ on_close=getattr(result, "close", None),
+ )
+
+
+class BodyStream(io.RawIOBase):
+ def __init__(self, iterator: typing.Iterator[bytes]) -> None:
+ self._iterator = iterator
+ self._buffer = b""
+ self._closed = False
+
+ def read(self, size: int = -1) -> bytes:
+ if self._closed:
+ return b""
+
+ if size == -1:
+ return self.readall()
+
+ try:
+ while len(self._buffer) < size:
+ self._buffer += next(self._iterator)
+ except StopIteration:
+ self._closed = True
+ return self._buffer
+
+ output = self._buffer[:size]
+ self._buffer = self._buffer[size:]
+ return output
+
+ def readall(self) -> bytes:
+ if self._closed:
+ raise OSError("Stream closed") # pragma: nocover
+
+ for chunk in self._iterator:
+ self._buffer += chunk
+
+ self._closed = True
+ return self._buffer
+
+ def readinto(self, b: bytearray) -> typing.Optional[int]: # pragma: nocover
+ output = self.read(len(b))
+ count = len(output)
+ b[:count] = output
+ return count
+
+ def write(self, b: bytes) -> int:
+ raise OSError("Operation not supported") # pragma: nocover
+
+ def fileno(self) -> int:
+ raise OSError("Operation not supported") # pragma: nocover
+
+ def seek(self, offset: int, whence: int = 0) -> int:
+ raise OSError("Operation not supported") # pragma: nocover
+
+ def truncate(self, size: int = None) -> int:
+ raise OSError("Operation not supported") # pragma: nocover
nav:
- Introduction: 'index.md'
- QuickStart: 'quickstart.md'
+ - Advanced Usage: 'advanced.md'
- Parallel Requests: 'parallel.md'
- Async Client: 'async.md'
- Requests Compatibility: 'compatibility.md'
--- /dev/null
+import pytest
+
+import http3
+
+
+async def hello_world(scope, receive, send):
+ status = 200
+ output = b"Hello, World!"
+ headers = [(b"content-type", "text/plain"), (b"content-length", str(len(output)))]
+
+ await send({"type": "http.response.start", "status": status, "headers": headers})
+ await send({"type": "http.response.body", "body": output})
+
+
+async def echo_body(scope, receive, send):
+ status = 200
+ headers = [(b"content-type", "text/plain")]
+
+ await send({"type": "http.response.start", "status": status, "headers": headers})
+ more_body = True
+ while more_body:
+ message = await receive()
+ body = message.get("body", b"")
+ more_body = message.get("more_body", False)
+ await send({"type": "http.response.body", "body": body, "more_body": more_body})
+
+
+async def raise_exc(scope, receive, send):
+ raise ValueError()
+
+
+async def raise_exc_after_response(scope, receive, send):
+ status = 200
+ output = b"Hello, World!"
+ headers = [(b"content-type", "text/plain"), (b"content-length", str(len(output)))]
+
+ await send({"type": "http.response.start", "status": status, "headers": headers})
+ await send({"type": "http.response.body", "body": output})
+ raise ValueError()
+
+
+def test_asgi():
+ client = http3.Client(app=hello_world)
+ response = client.get("http://www.example.org/")
+ assert response.status_code == 200
+ assert response.text == "Hello, World!"
+
+
+def test_asgi_upload():
+ client = http3.Client(app=echo_body)
+ response = client.post("http://www.example.org/", data=b"example")
+ assert response.status_code == 200
+ assert response.text == "example"
+
+
+def test_asgi_exc():
+ client = http3.Client(app=raise_exc)
+ with pytest.raises(ValueError):
+ response = client.get("http://www.example.org/")
+
+
+def test_asgi_exc_after_response():
+ client = http3.Client(app=raise_exc_after_response)
+ with pytest.raises(ValueError):
+ response = client.get("http://www.example.org/")
--- /dev/null
+import sys
+
+import pytest
+
+import http3
+
+
+def hello_world(environ, start_response):
+ status = "200 OK"
+ output = b"Hello, World!"
+
+ response_headers = [
+ ("Content-type", "text/plain"),
+ ("Content-Length", str(len(output))),
+ ]
+
+ start_response(status, response_headers)
+
+ return [output]
+
+
+def echo_body(environ, start_response):
+ status = "200 OK"
+ output = environ["wsgi.input"].read()
+
+ response_headers = [
+ ("Content-type", "text/plain"),
+ ("Content-Length", str(len(output))),
+ ]
+
+ start_response(status, response_headers)
+
+ return [output]
+
+
+def echo_body_with_response_stream(environ, start_response):
+ status = "200 OK"
+
+ response_headers = [("Content-Type", "text/plain")]
+
+ start_response(status, response_headers)
+
+ def output_generator(f):
+ while True:
+ output = f.read(2)
+ if not output:
+ break
+ yield output
+
+ return output_generator(f=environ["wsgi.input"])
+
+
+def raise_exc(environ, start_response):
+ status = "500 Server Error"
+ output = b"Nope!"
+
+ response_headers = [
+ ("Content-type", "text/plain"),
+ ("Content-Length", str(len(output))),
+ ]
+
+ try:
+ raise ValueError()
+ except:
+ exc_info = sys.exc_info()
+ start_response(status, response_headers, exc_info=exc_info)
+
+ return [output]
+
+
+def test_wsgi():
+ client = http3.Client(app=hello_world)
+ response = client.get("http://www.example.org/")
+ assert response.status_code == 200
+ assert response.text == "Hello, World!"
+
+
+def test_wsgi_upload():
+ client = http3.Client(app=echo_body)
+ response = client.post("http://www.example.org/", data=b"example")
+ assert response.status_code == 200
+ assert response.text == "example"
+
+
+def test_wsgi_upload_with_response_stream():
+ client = http3.Client(app=echo_body_with_response_stream)
+ response = client.post("http://www.example.org/", data=b"example")
+ assert response.status_code == 200
+ assert response.text == "example"
+
+
+def test_wsgi_exc():
+ client = http3.Client(app=raise_exc)
+ with pytest.raises(ValueError):
+ response = client.get("http://www.example.org/")