From: Tom Christie Date: Tue, 18 Jun 2019 14:53:33 +0000 (+0100) Subject: Support WSGI and ASGI apps. (#94) X-Git-Tag: 0.5.0~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bb8697011d1b7f525131e11226818fef399c0f78;p=thirdparty%2Fhttpx.git Support WSGI and ASGI apps. (#94) * Add 'Client(app=...)' support * Add ASGI support with Client(app=app) * Improve exc cases when using client with ASGI * Tighten up exception handling when using as a test client * Fix up stream I/O for WSGI/ASGI apps * Docs for WSGI/ASGI integration --- diff --git a/README.md b/README.md index c8ce9046..66f67eef 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,9 @@ HTTP3 builds on the well-established usability of `requests`, and gives you: * A requests-compatible API. * HTTP/2 and HTTP/1.1 support. -* Support for issuing HTTP requests in parallel. -* Standard synchronous interface, but with `async`/`await` support if you need it. +* Support for [issuing HTTP requests in parallel](https://www.encode.io/http3/parallel/). *(Coming soon)* +* Standard synchronous interface, but [with `async`/`await` support if you need it](https://www.encode.io/http3/async/). +* Ability to [make requests directly to WSGI or ASGI applications](https://www.encode.io/http3/advanced/#calling-into-python-web-apps). * Strict timeouts everywhere. * Fully type annotated. * 100% test coverage. @@ -51,16 +52,16 @@ Plus all the standard features of `requests`... * Keep-Alive & Connection Pooling * Sessions with Cookie Persistence * Browser-style SSL Verification -* Basic/Digest Authentication *Digest is still TODO* +* Basic/Digest Authentication *(Digest is still TODO)* * Elegant Key/Value Cookies * Automatic Decompression * Automatic Content Decoding * Unicode Response Bodies * Multipart File Uploads -* HTTP(S) Proxy Support *TODO* +* HTTP(S) Proxy Support *(TODO)* * Connection Timeouts * Streaming Downloads -* .netrc Support *TODO* +* .netrc Support *(TODO)* * Chunked Requests ## Installation diff --git a/docs/advanced.md b/docs/advanced.md new file mode 100644 index 00000000..e9a54430 --- /dev/null +++ b/docs/advanced.md @@ -0,0 +1,43 @@ +# Advanced Usage + +## Client Instances + +Using a Client instance to make requests will give you HTTP connection pooling, +will provide cookie persistence, and allows you to apply configuration across +all outgoing requests. + +```python +>>> client = http3.Client() +>>> r = client.get('https://example.org/') +>>> r + +``` + +## Calling into Python Web Apps + +You can configure an `http3` client to call directly into a Python web +application, using either the WSGI or ASGI protocol. + +This is particularly useful for two main use-cases: + +* Using `http3` as a client, inside test cases. +* Mocking out external services, during tests or in dev/staging environments. + +Here's an example of integrating against a Flask application: + +```python +from flask import Flask +import http3 + + +app = Flask(__name__) + +@app.route("/") +def hello(): + return "Hello World!" + +client = http3.Client(app=app) +r = client.get('http://example/') +assert r.status_code == 200 +assert r.text == "Hello World!" +``` diff --git a/docs/api.md b/docs/api.md index 1e55aa8c..3b9e4f18 100644 --- a/docs/api.md +++ b/docs/api.md @@ -20,7 +20,7 @@ >>> response = client.get('https://example.org') ``` -* `def __init__([auth], [cookies], [verify], [cert], [timeout], [pool_limits], [max_redirects], [dispatch])` +* `def __init__([auth], [cookies], [verify], [cert], [timeout], [pool_limits], [max_redirects], [app], [dispatch])` * `def .get(url, [params], [headers], [cookies], [auth], [stream], [allow_redirects], [verify], [cert], [timeout])` * `def .options(url, [params], [headers], [cookies], [auth], [stream], [allow_redirects], [verify], [cert], [timeout])` * `def .head(url, [params], [headers], [cookies], [auth], [stream], [allow_redirects], [verify], [cert], [timeout])` diff --git a/docs/async.md b/docs/async.md index 7822d35d..0303d745 100644 --- a/docs/async.md +++ b/docs/async.md @@ -7,9 +7,8 @@ Async is a concurrency model that is far more efficient than multi-threading, and can provide significant performance benefits and enable the use of long-lived network connections such as WebSockets. -If you're working with an async web framework such as Sanic, Starlette, FastAPI, -Responder or Bocadillo, then you'll also want to use an async client for sending -outgoing HTTP requests. +If you're working with an async web framework then you'll also want to use an +async client for sending outgoing HTTP requests. ## Making Async requests diff --git a/docs/index.md b/docs/index.md index e65da7fe..f34504b4 100644 --- a/docs/index.md +++ b/docs/index.md @@ -41,8 +41,9 @@ HTTP3 builds on the well-established usability of `requests`, and gives you: * A requests-compatible API. * HTTP/2 and HTTP/1.1 support. -* Support for issuing HTTP requests in parallel. -* Standard synchronous interface, but with `async`/`await` support if you need it. +* Support for [issuing HTTP requests in parallel](parallel.md). *(Coming soon)* +* Standard synchronous interface, but [with `async`/`await` support if you need it](async.md). +* Ability to [make requests directly to WSGI or ASGI applications](advanced.md#calling-into-python-web-apps). * Strict timeouts everywhere. * Fully type annotated. * 100% test coverage. @@ -53,23 +54,25 @@ Plus all the standard features of `requests`... * Keep-Alive & Connection Pooling * Sessions with Cookie Persistence * Browser-style SSL Verification -* Basic/Digest Authentication *Digest is still TODO* +* Basic/Digest Authentication *(Digest is still TODO)* * Elegant Key/Value Cookies * Automatic Decompression * Automatic Content Decoding * Unicode Response Bodies * Multipart File Uploads -* HTTP(S) Proxy Support *TODO* +* HTTP(S) Proxy Support *(TODO)* * Connection Timeouts * Streaming Downloads -* .netrc Support *TODO* +* .netrc Support *(TODO)* * Chunked Requests ## Documentation For a run-through of all the basics, head over to the [QuickStart](quickstart.md). -For more advanced topics, see the [Parallel Requests](parallel.md) or [Async Client](async.md) documentation. +For more advanced topics, see the [Advanced Usage](advanced.md) section, or +the specific topics on making [Parallel Requests](parallel.md) or using the +[Async Client](async.md). The [Developer Interface](api.md) provides a comprehensive API reference. diff --git a/http3/client.py b/http3/client.py index fd45de54..fd92ef32 100644 --- a/http3/client.py +++ b/http3/client.py @@ -1,3 +1,4 @@ +import inspect import typing from types import TracebackType @@ -12,8 +13,10 @@ from .config import ( TimeoutTypes, VerifyTypes, ) +from .dispatch.asgi import ASGIDispatch from .dispatch.connection_pool import ConnectionPool from .dispatch.threaded import ThreadedDispatcher +from .dispatch.wsgi import WSGIDispatch from .exceptions import RedirectBodyUnavailable, RedirectLoop, TooManyRedirects from .interfaces import AsyncDispatcher, ConcurrencyBackend, Dispatcher from .models import ( @@ -49,11 +52,20 @@ class BaseClient: pool_limits: PoolLimits = DEFAULT_POOL_LIMITS, max_redirects: int = DEFAULT_MAX_REDIRECTS, dispatch: typing.Union[AsyncDispatcher, Dispatcher] = None, + app: typing.Callable = None, backend: ConcurrencyBackend = None, ): if backend is None: backend = AsyncioBackend() + if app is not None: + param_count = len(inspect.signature(app).parameters) + assert param_count in (2, 3) + if param_count == 2: + dispatch = WSGIDispatch(app=app) + else: + dispatch = ASGIDispatch(app=app) + if dispatch is None: async_dispatch = ConnectionPool( verify=verify, diff --git a/http3/dispatch/asgi.py b/http3/dispatch/asgi.py new file mode 100644 index 00000000..da155a10 --- /dev/null +++ b/http3/dispatch/asgi.py @@ -0,0 +1,176 @@ +import asyncio +import typing + +from ..config import CertTypes, TimeoutTypes, VerifyTypes +from ..interfaces import AsyncDispatcher +from ..models import AsyncRequest, AsyncResponse + + +class ASGIDispatch(AsyncDispatcher): + """ + A custom dispatcher that handles sending requests directly to an ASGI app. + + The simplest way to use this functionality is to use the `app`argument. + This will automatically infer if 'app' is a WSGI or an ASGI application, + and will setup an appropriate dispatch class: + + ``` + client = http3.Client(app=app) + ``` + + Alternatively, you can setup the dispatch instance explicitly. + This allows you to include any additional configuration arguments specific + to the ASGIDispatch class: + + ``` + dispatch = http3.ASGIDispatch( + app=app, + root_path="/submount", + client=("1.2.3.4", 123) + ) + client = http3.Client(dispatch=dispatch) + ``` + """ + + def __init__( + self, + app: typing.Callable, + root_path: str = "", + client: typing.Tuple[str, int] = ("127.0.0.1", 123), + ) -> None: + self.app = app + self.root_path = root_path + self.client = client + + async def send( + self, + request: AsyncRequest, + verify: VerifyTypes = None, + cert: CertTypes = None, + timeout: TimeoutTypes = None, + ) -> AsyncResponse: + + scope = { + "type": "http", + "asgi": {"version": "3.0"}, + "method": request.method, + "headers": request.headers.raw, + "scheme": request.url.scheme, + "path": request.url.path, + "query": request.url.query.encode("ascii"), + "server": request.url.host, + "client": self.client, + "root_path": self.root_path, + } + app = self.app + app_exc = None + status_code = None + headers = None + response_started = asyncio.Event() + response_body = BodyIterator() + request_stream = request.stream() + + async def receive() -> dict: + nonlocal request_stream + + try: + body = await request_stream.__anext__() + except StopAsyncIteration: + return {"type": "http.request", "body": b"", "more_body": False} + return {"type": "http.request", "body": body, "more_body": True} + + async def send(message: dict) -> None: + nonlocal status_code, headers, response_started, response_body + + if message["type"] == "http.response.start": + status_code = message["status"] + headers = message.get("headers", []) + response_started.set() + elif message["type"] == "http.response.body": + body = message.get("body", b"") + more_body = message.get("more_body", False) + if body: + await response_body.put(body) + if not more_body: + await response_body.done() + + async def run_app() -> None: + nonlocal app, scope, receive, send, app_exc, response_body + + try: + await app(scope, receive, send) + except Exception as exc: + app_exc = exc + finally: + await response_body.done() + + # Really we'd like to push all `asyncio` logic into concurrency.py, + # with a standardized interface, so that we can support other event + # loop implementations, such as Trio and Curio. + # That's a bit fiddly here, so we're not yet supporting using a custom + # `ConcurrencyBackend` with the `Client(app=asgi_app)` case. + loop = asyncio.get_event_loop() + app_task = loop.create_task(run_app()) + response_task = loop.create_task(response_started.wait()) + + tasks = {app_task, response_task} # type: typing.Set[asyncio.Task] + + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + if app_exc is not None: + raise app_exc + + assert response_started.is_set, "application did not return a response." + assert status_code is not None + assert headers is not None + + async def on_close() -> None: + nonlocal app_task + await app_task + if app_exc is not None: + raise app_exc + + return AsyncResponse( + status_code=status_code, + protocol="HTTP/1.1", + headers=headers, + content=response_body.iterate(), + on_close=on_close, + request=request, + ) + + +class BodyIterator: + """ + Provides a byte-iterator interface that the client can use to + ingest the response content from. + """ + + def __init__(self) -> None: + self._queue = asyncio.Queue( + maxsize=1 + ) # type: asyncio.Queue[typing.Union[bytes, object]] + self._done = object() + + async def iterate(self) -> typing.AsyncIterator[bytes]: + """ + A byte-iterator, used by the client to consume the response body. + """ + while True: + data = await self._queue.get() + if data is self._done: + break + assert isinstance(data, bytes) + yield data + + async def put(self, data: bytes) -> None: + """ + Used by the server to add data to the response body. + """ + await self._queue.put(data) + + async def done(self) -> None: + """ + Used by the server to signal the end of the response body. + """ + await self._queue.put(self._done) diff --git a/http3/dispatch/wsgi.py b/http3/dispatch/wsgi.py new file mode 100644 index 00000000..01c1aae1 --- /dev/null +++ b/http3/dispatch/wsgi.py @@ -0,0 +1,152 @@ +import io +import typing + +from ..config import CertTypes, TimeoutTypes, VerifyTypes +from ..interfaces import Dispatcher +from ..models import Request, Response + + +class WSGIDispatch(Dispatcher): + """ + A custom dispatcher that handles sending requests directly to an ASGI app. + + The simplest way to use this functionality is to use the `app`argument. + This will automatically infer if 'app' is a WSGI or an ASGI application, + and will setup an appropriate dispatch class: + + ``` + client = http3.Client(app=app) + ``` + + Alternatively, you can setup the dispatch instance explicitly. + This allows you to include any additional configuration arguments specific + to the WSGIDispatch class: + + ``` + dispatch = http3.WSGIDispatch( + app=app, + script_name="/submount", + remote_addr="1.2.3.4" + ) + client = http3.Client(dispatch=dispatch) + ``` + """ + + def __init__( + self, + app: typing.Callable, + script_name: str = "", + remote_addr: str = "127.0.0.1", + ) -> None: + self.app = app + self.script_name = script_name + self.remote_addr = remote_addr + + def send( + self, + request: Request, + verify: VerifyTypes = None, + cert: CertTypes = None, + timeout: TimeoutTypes = None, + ) -> Response: + environ = { + "wsgi.version": (1, 0), + "wsgi.url_scheme": request.url.scheme, + "wsgi.input": BodyStream(request.stream()), + "wsgi.errors": io.BytesIO(), + "wsgi.multithread": True, + "wsgi.multiprocess": False, + "wsgi.run_once": False, + "REQUEST_METHOD": request.method, + "SCRIPT_NAME": self.script_name, + "PATH_INFO": request.url.path, + "QUERY_STRING": request.url.query, + "SERVER_NAME": request.url.host, + "SERVER_PORT": str(request.url.port), + "REMOTE_ADDR": self.remote_addr, + } + for key, value in request.headers.items(): + key = key.upper().replace("-", "_") + if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"): + key = "HTTP_" + key + environ[key] = value + + seen_status = None + seen_response_headers = None + seen_exc_info = None + + def start_response( + status: str, response_headers: list, exc_info: typing.Any = None + ) -> None: + nonlocal seen_status, seen_response_headers, seen_exc_info + seen_status = status + seen_response_headers = response_headers + seen_exc_info = exc_info + + result = self.app(environ, start_response) + + assert seen_status is not None + assert seen_response_headers is not None + if seen_exc_info: + raise seen_exc_info[1] + + return Response( + status_code=int(seen_status.split()[0]), + protocol="HTTP/1.1", + headers=seen_response_headers, + content=(chunk for chunk in result), + on_close=getattr(result, "close", None), + ) + + +class BodyStream(io.RawIOBase): + def __init__(self, iterator: typing.Iterator[bytes]) -> None: + self._iterator = iterator + self._buffer = b"" + self._closed = False + + def read(self, size: int = -1) -> bytes: + if self._closed: + return b"" + + if size == -1: + return self.readall() + + try: + while len(self._buffer) < size: + self._buffer += next(self._iterator) + except StopIteration: + self._closed = True + return self._buffer + + output = self._buffer[:size] + self._buffer = self._buffer[size:] + return output + + def readall(self) -> bytes: + if self._closed: + raise OSError("Stream closed") # pragma: nocover + + for chunk in self._iterator: + self._buffer += chunk + + self._closed = True + return self._buffer + + def readinto(self, b: bytearray) -> typing.Optional[int]: # pragma: nocover + output = self.read(len(b)) + count = len(output) + b[:count] = output + return count + + def write(self, b: bytes) -> int: + raise OSError("Operation not supported") # pragma: nocover + + def fileno(self) -> int: + raise OSError("Operation not supported") # pragma: nocover + + def seek(self, offset: int, whence: int = 0) -> int: + raise OSError("Operation not supported") # pragma: nocover + + def truncate(self, size: int = None) -> int: + raise OSError("Operation not supported") # pragma: nocover diff --git a/mkdocs.yml b/mkdocs.yml index 2e80f344..d9d3eb3e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -11,6 +11,7 @@ edit_uri: "" nav: - Introduction: 'index.md' - QuickStart: 'quickstart.md' + - Advanced Usage: 'advanced.md' - Parallel Requests: 'parallel.md' - Async Client: 'async.md' - Requests Compatibility: 'compatibility.md' diff --git a/tests/test_asgi.py b/tests/test_asgi.py new file mode 100644 index 00000000..79318b9e --- /dev/null +++ b/tests/test_asgi.py @@ -0,0 +1,65 @@ +import pytest + +import http3 + + +async def hello_world(scope, receive, send): + status = 200 + output = b"Hello, World!" + headers = [(b"content-type", "text/plain"), (b"content-length", str(len(output)))] + + await send({"type": "http.response.start", "status": status, "headers": headers}) + await send({"type": "http.response.body", "body": output}) + + +async def echo_body(scope, receive, send): + status = 200 + headers = [(b"content-type", "text/plain")] + + await send({"type": "http.response.start", "status": status, "headers": headers}) + more_body = True + while more_body: + message = await receive() + body = message.get("body", b"") + more_body = message.get("more_body", False) + await send({"type": "http.response.body", "body": body, "more_body": more_body}) + + +async def raise_exc(scope, receive, send): + raise ValueError() + + +async def raise_exc_after_response(scope, receive, send): + status = 200 + output = b"Hello, World!" + headers = [(b"content-type", "text/plain"), (b"content-length", str(len(output)))] + + await send({"type": "http.response.start", "status": status, "headers": headers}) + await send({"type": "http.response.body", "body": output}) + raise ValueError() + + +def test_asgi(): + client = http3.Client(app=hello_world) + response = client.get("http://www.example.org/") + assert response.status_code == 200 + assert response.text == "Hello, World!" + + +def test_asgi_upload(): + client = http3.Client(app=echo_body) + response = client.post("http://www.example.org/", data=b"example") + assert response.status_code == 200 + assert response.text == "example" + + +def test_asgi_exc(): + client = http3.Client(app=raise_exc) + with pytest.raises(ValueError): + response = client.get("http://www.example.org/") + + +def test_asgi_exc_after_response(): + client = http3.Client(app=raise_exc_after_response) + with pytest.raises(ValueError): + response = client.get("http://www.example.org/") diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py new file mode 100644 index 00000000..4a277c9a --- /dev/null +++ b/tests/test_wsgi.py @@ -0,0 +1,95 @@ +import sys + +import pytest + +import http3 + + +def hello_world(environ, start_response): + status = "200 OK" + output = b"Hello, World!" + + response_headers = [ + ("Content-type", "text/plain"), + ("Content-Length", str(len(output))), + ] + + start_response(status, response_headers) + + return [output] + + +def echo_body(environ, start_response): + status = "200 OK" + output = environ["wsgi.input"].read() + + response_headers = [ + ("Content-type", "text/plain"), + ("Content-Length", str(len(output))), + ] + + start_response(status, response_headers) + + return [output] + + +def echo_body_with_response_stream(environ, start_response): + status = "200 OK" + + response_headers = [("Content-Type", "text/plain")] + + start_response(status, response_headers) + + def output_generator(f): + while True: + output = f.read(2) + if not output: + break + yield output + + return output_generator(f=environ["wsgi.input"]) + + +def raise_exc(environ, start_response): + status = "500 Server Error" + output = b"Nope!" + + response_headers = [ + ("Content-type", "text/plain"), + ("Content-Length", str(len(output))), + ] + + try: + raise ValueError() + except: + exc_info = sys.exc_info() + start_response(status, response_headers, exc_info=exc_info) + + return [output] + + +def test_wsgi(): + client = http3.Client(app=hello_world) + response = client.get("http://www.example.org/") + assert response.status_code == 200 + assert response.text == "Hello, World!" + + +def test_wsgi_upload(): + client = http3.Client(app=echo_body) + response = client.post("http://www.example.org/", data=b"example") + assert response.status_code == 200 + assert response.text == "example" + + +def test_wsgi_upload_with_response_stream(): + client = http3.Client(app=echo_body_with_response_stream) + response = client.post("http://www.example.org/", data=b"example") + assert response.status_code == 200 + assert response.text == "example" + + +def test_wsgi_exc(): + client = http3.Client(app=raise_exc) + with pytest.raises(ValueError): + response = client.get("http://www.example.org/")