+import typing
from typing import Callable, Dict, List, Optional, Tuple
import httpcore
+import sniffio
from .._content_streams import ByteStream
+if typing.TYPE_CHECKING: # pragma: no cover
+ import asyncio
+ import trio
+
+ Event = typing.Union[asyncio.Event, trio.Event]
+
+
+def create_event() -> "Event":
+ if sniffio.current_async_library() == "trio":
+ import trio
+
+ return trio.Event()
+ else:
+ import asyncio
+
+ return asyncio.Event()
+
class ASGIDispatch(httpcore.AsyncHTTPTransport):
"""
status_code = None
response_headers = None
body_parts = []
+ request_complete = False
response_started = False
- response_complete = False
+ response_complete = create_event()
headers = [] if headers is None else headers
stream = ByteStream(b"") if stream is None else stream
request_body_chunks = stream.__aiter__()
async def receive() -> dict:
- nonlocal response_complete
+ nonlocal request_complete, response_complete
- if response_complete:
+ if request_complete:
+ await response_complete.wait()
return {"type": "http.disconnect"}
try:
body = await request_body_chunks.__anext__()
except StopAsyncIteration:
+ request_complete = True
return {"type": "http.request", "body": b"", "more_body": False}
return {"type": "http.request", "body": body, "more_body": True}
response_started = True
elif message["type"] == "http.response.body":
- assert not response_complete
+ assert not response_complete.is_set()
body = message.get("body", b"")
more_body = message.get("more_body", False)
body_parts.append(body)
if not more_body:
- response_complete = True
+ response_complete.set()
try:
await self.app(scope, receive, send)
if self.raise_app_exceptions or not response_complete:
raise
- assert response_complete
+ assert response_complete.is_set()
assert status_code is not None
assert response_headers is not None