"ignore: starlette.middleware.wsgi is deprecated and will be removed in a future release.*:DeprecationWarning",
"ignore: Async generator 'starlette.requests.Request.stream' was garbage collected before it had been exhausted.*:ResourceWarning",
"ignore: Use 'content=<...>' to upload raw bytes/text content.:DeprecationWarning",
- # TODO: This warning appeared when we bumped anyio to 4.4.0.
- "ignore: Unclosed .MemoryObject(Send|Receive)Stream.:ResourceWarning",
]
[tool.coverage.run]
import typing
import anyio
-from anyio.abc import ObjectReceiveStream, ObjectSendStream
from starlette._utils import collapse_excgroups
from starlette.requests import ClientDisconnect, Request
async def call_next(request: Request) -> Response:
app_exc: Exception | None = None
- send_stream: ObjectSendStream[typing.MutableMapping[str, typing.Any]]
- recv_stream: ObjectReceiveStream[typing.MutableMapping[str, typing.Any]]
- send_stream, recv_stream = anyio.create_memory_object_stream()
async def receive_or_disconnect() -> Message:
if response_sent.is_set():
return message
- async def close_recv_stream_on_response_sent() -> None:
- await response_sent.wait()
- recv_stream.close()
-
async def send_no_error(message: Message) -> None:
try:
await send_stream.send(message)
async def coro() -> None:
nonlocal app_exc
- async with send_stream:
+ with send_stream:
try:
await self.app(scope, receive_or_disconnect, send_no_error)
except Exception as exc:
app_exc = exc
- task_group.start_soon(close_recv_stream_on_response_sent)
task_group.start_soon(coro)
try:
assert message["type"] == "http.response.start"
async def body_stream() -> typing.AsyncGenerator[bytes, None]:
- async with recv_stream:
- async for message in recv_stream:
- assert message["type"] == "http.response.body"
- body = message.get("body", b"")
- if body:
- yield body
- if not message.get("more_body", False):
- break
+ async for message in recv_stream:
+ assert message["type"] == "http.response.body"
+ body = message.get("body", b"")
+ if body:
+ yield body
+ if not message.get("more_body", False):
+ break
if app_exc is not None:
raise app_exc
response.raw_headers = message["headers"]
return response
- with collapse_excgroups():
+ send_stream, recv_stream = anyio.create_memory_object_stream[Message]()
+ with recv_stream, send_stream, collapse_excgroups():
async with anyio.create_task_group() as task_group:
response = await self.dispatch_func(request, call_next)
await response(scope, wrapped_receive, send)
response_sent.set()
+ recv_stream.close()
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
raise NotImplementedError() # pragma: no cover
import anyio
import anyio.abc
import anyio.from_thread
-from anyio.abc import ObjectReceiveStream, ObjectSendStream
from anyio.streams.stapled import StapledObjectStream
from starlette._utils import is_async_callable
def reset_portal() -> None:
self.portal = None
- send1: ObjectSendStream[typing.MutableMapping[str, typing.Any] | None]
- receive1: ObjectReceiveStream[typing.MutableMapping[str, typing.Any] | None]
- send2: ObjectSendStream[typing.MutableMapping[str, typing.Any]]
- receive2: ObjectReceiveStream[typing.MutableMapping[str, typing.Any]]
- send1, receive1 = anyio.create_memory_object_stream(math.inf)
- send2, receive2 = anyio.create_memory_object_stream(math.inf)
+ send1, receive1 = anyio.create_memory_object_stream[
+ typing.Union[typing.MutableMapping[str, typing.Any], None]
+ ](math.inf)
+ send2, receive2 = anyio.create_memory_object_stream[typing.MutableMapping[str, typing.Any]](math.inf)
+ for channel in (send1, send2, receive1, receive2):
+ stack.callback(channel.close)
self.stream_send = StapledObjectStream(send1, receive1)
self.stream_receive = StapledObjectStream(send2, receive2)
self.task = portal.start_task_soon(self.lifespan)
self.task.result()
return message
- async with self.stream_send, self.stream_receive:
- await self.stream_receive.send({"type": "lifespan.shutdown"})
- message = await receive()
- assert message["type"] in (
- "lifespan.shutdown.complete",
- "lifespan.shutdown.failed",
- )
- if message["type"] == "lifespan.shutdown.failed":
- await receive()
+ await self.stream_receive.send({"type": "lifespan.shutdown"})
+ message = await receive()
+ assert message["type"] in (
+ "lifespan.shutdown.complete",
+ "lifespan.shutdown.failed",
+ )
+ if message["type"] == "lifespan.shutdown.failed":
+ await receive()