-from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ AsyncIterator,
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Tuple,
+ Union,
+)
import httpcore
import sniffio
-from .._content_streams import ByteStream
-
if TYPE_CHECKING: # pragma: no cover
import asyncio
return asyncio.Event()
+async def async_byte_iterator(bytestring: bytes) -> AsyncIterator[bytes]:
+ yield bytestring
+
+
class ASGITransport(httpcore.AsyncHTTPTransport):
"""
A custom AsyncTransport that handles sending requests directly to an ASGI app.
timeout: Dict[str, Optional[float]] = None,
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], httpcore.AsyncByteStream]:
headers = [] if headers is None else headers
- stream = ByteStream(b"") if stream is None else stream
+ stream = (
+ httpcore.AsyncByteStream(async_byte_iterator(b""))
+ if stream is None
+ else stream
+ )
# ASGI scope.
scheme, host, port, full_path = url
assert status_code is not None
assert response_headers is not None
- stream = ByteStream(b"".join(body_parts))
+ response_body = b"".join(body_parts)
+
+ stream = httpcore.AsyncByteStream(async_byte_iterator(response_body))
return (b"HTTP/1.1", status_code, b"", response_headers, stream)
import httpcore
-from .._content_streams import ByteStream, IteratorStream
-
def _skip_leading_empty_chunks(body: typing.Iterable) -> typing.Iterable:
body = iter(body)
httpcore.SyncByteStream,
]:
headers = [] if headers is None else headers
- stream = ByteStream(b"") if stream is None else stream
+ stream = (
+ httpcore.SyncByteStream(chunk for chunk in [b""])
+ if stream is None
+ else stream
+ )
scheme, host, port, full_path = url
path, _, query = full_path.partition(b"?")
(key.encode("ascii"), value.encode("ascii"))
for key, value in seen_response_headers
]
- stream = IteratorStream(chunk for chunk in result)
+ stream = httpcore.SyncByteStream(chunk for chunk in result)
return (b"HTTP/1.1", status_code, b"", headers, stream)