if self.chunked:
return self._read_chunked(amt)
- if amt is not None:
+ if amt is not None and amt >= 0:
if self.length is not None and amt > self.length:
# clip the read to the "end of response"
amt = self.length
def _read_chunked(self, amt=None):
assert self.chunked != _UNKNOWN
+ if amt is not None and amt < 0:
+ amt = None
value = []
try:
while (chunk_left := self._get_chunk_left()) is not None:
self.assertEqual(resp.read(), expected)
resp.close()
+ # Explicit full read
+ for n in (-123, -1, None):
+ with self.subTest('full read', n=n):
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertTrue(resp.chunked)
+ self.assertEqual(resp.read(n), expected)
+ resp.close()
+
+ # Read first chunk
+ with self.subTest('read1(-1)'):
+ sock = FakeSocket(chunked_start + last_chunk + chunked_end)
+ resp = client.HTTPResponse(sock, method="GET")
+ resp.begin()
+ self.assertTrue(resp.chunked)
+ self.assertEqual(resp.read1(-1), b"hello worl")
+ resp.close()
+
# Various read sizes
for n in range(1, 12):
sock = FakeSocket(chunked_start + last_chunk + chunked_end)