self.filename = filename
self.file = fileobj
self.headers = headers
- self._consumed = False
def get_length(self) -> int:
headers = self.render_headers()
yield self._data
return
- if self._consumed: # pragma: nocover
+ if hasattr(self.file, "seek"):
self.file.seek(0)
- self._consumed = True
chunk = self.file.read(self.CHUNK_SIZE)
while chunk:
import cgi
import io
import os
+import tempfile
import typing
from unittest import mock
assert content == b"".join(stream)
+def test_multipart_rewinds_files():
+ with tempfile.TemporaryFile() as upload:
+ upload.write(b"Hello, world!")
+
+ transport = httpx.MockTransport(echo_request_content)
+ client = httpx.Client(transport=transport)
+
+ files = {"file": upload}
+ response = client.post("http://127.0.0.1:8000/", files=files)
+ assert response.status_code == 200
+ assert b"\r\nHello, world!\r\n" in response.content
+
+ # POSTing the same file instance a second time should have the same content.
+ files = {"file": upload}
+ response = client.post("http://127.0.0.1:8000/", files=files)
+ assert response.status_code == 200
+ assert b"\r\nHello, world!\r\n" in response.content
+
+
class TestHeaderParamHTML5Formatting:
def test_unicode(self):
param = format_form_param("filename", "n\u00e4me")