def request_response(
- func: typing.Callable[[Request], typing.Union[typing.Awaitable[Response], Response]]
+ func: typing.Callable[
+ [Request], typing.Union[typing.Awaitable[Response], Response]
+ ],
) -> ASGIApp:
"""
Takes a function or coroutine `func(request) -> response`,
def websocket_session(
- func: typing.Callable[[WebSocket], typing.Awaitable[None]]
+ func: typing.Callable[[WebSocket], typing.Awaitable[None]],
) -> ASGIApp:
"""
Takes a coroutine `func(session)`, and returns an ASGI application.
def test_read_request_stream_in_app_after_middleware_calls_stream(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
expected = [b""]
def test_read_request_stream_in_app_after_middleware_calls_body(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
expected = [b"a", b""]
def test_read_request_body_in_app_after_middleware_calls_stream(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
assert await request.body() == b""
def test_read_request_body_in_app_after_middleware_calls_body(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
assert await request.body() == b"a"
def test_read_request_stream_in_dispatch_after_app_calls_stream(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
expected = [b"a", b""]
def test_read_request_stream_in_dispatch_after_app_calls_body(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
assert await request.body() == b"a"
def test_read_request_stream_in_dispatch_after_app_calls_body_with_middleware_calling_body_before_call_next( # noqa: E501
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
assert await request.body() == b"a"
def test_read_request_body_in_dispatch_after_app_calls_body_with_middleware_calling_body_before_call_next( # noqa: E501
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
async def homepage(request: Request):
assert await request.body() == b"a"
def test_downstream_middleware_modifies_receive(
- test_client_factory: Callable[[ASGIApp], TestClient]
+ test_client_factory: Callable[[ASGIApp], TestClient],
) -> None:
"""If a downstream middleware modifies receive() the final ASGI app
should see the modified version.