class IteratorByteStream(SyncByteStream):
+ CHUNK_SIZE = 65_536
+
def __init__(self, stream: Iterable[bytes]):
self._stream = stream
self._is_stream_consumed = False
raise StreamConsumed()
self._is_stream_consumed = True
- for part in self._stream:
- yield part
+ if hasattr(self._stream, "read"):
+ # File-like interfaces should use 'read' directly.
+ chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore
+ while chunk:
+ yield chunk
+ chunk = self._stream.read(self.CHUNK_SIZE) # type: ignore
+ else:
+ # Otherwise iterate.
+ for part in self._stream:
+ yield part
class AsyncIteratorByteStream(AsyncByteStream):
+ CHUNK_SIZE = 65_536
+
def __init__(self, stream: AsyncIterable[bytes]):
self._stream = stream
self._is_stream_consumed = False
raise StreamConsumed()
self._is_stream_consumed = True
- async for part in self._stream:
- yield part
+ if hasattr(self._stream, "aread"):
+ # File-like interfaces should use 'aread' directly.
+ chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore
+ while chunk:
+ yield chunk
+ chunk = await self._stream.aread(self.CHUNK_SIZE) # type: ignore
+ else:
+ # Otherwise iterate.
+ async for part in self._stream:
+ yield part
class UnattachedStream(AsyncByteStream, SyncByteStream):
A single file field item, within a multipart form field.
"""
+ CHUNK_SIZE = 64 * 1024
+
def __init__(self, name: str, value: FileTypes) -> None:
self.name = name
self.file.seek(0)
self._consumed = True
- for chunk in self.file:
+ chunk = self.file.read(self.CHUNK_SIZE)
+ while chunk:
yield to_bytes(chunk)
+ chunk = self.file.read(self.CHUNK_SIZE)
def render(self) -> typing.Iterator[bytes]:
yield self.render_headers()
assert content == b"Hello, world!"
+@pytest.mark.asyncio
+async def test_async_bytesio_content():
+ class AsyncBytesIO:
+ def __init__(self, content):
+ self._idx = 0
+ self._content = content
+
+ async def aread(self, chunk_size: int):
+ chunk = self._content[self._idx : self._idx + chunk_size]
+ self._idx = self._idx + chunk_size
+ return chunk
+
+ async def __aiter__(self):
+ yield self._content # pragma: nocover
+
+ headers, stream = encode_request(content=AsyncBytesIO(b"Hello, world!"))
+ assert not isinstance(stream, typing.Iterable)
+ assert isinstance(stream, typing.AsyncIterable)
+
+ content = b"".join([part async for part in stream])
+
+ assert headers == {"Transfer-Encoding": "chunked"}
+ assert content == b"Hello, world!"
+
+
@pytest.mark.asyncio
async def test_iterator_content():
def hello_world():