DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
+# Data larger than this will be read in chunks, to prevent extreme
+# overallocation.
+_MIN_READ_BUF_SIZE = 1 << 20
+
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = 1 # Seems to make sense in testing environment
env = env
)
if self.command.lower() == "post" and nbytes > 0:
- data = self.rfile.read(nbytes)
+ cursize = 0
+ data = self.rfile.read(min(nbytes, _MIN_READ_BUF_SIZE))
+ while len(data) < nbytes and len(data) != cursize:
+ cursize = len(data)
+ # This is a geometric increase in read size (never more
+ # than doubling out the current length of data per loop
+ # iteration).
+ delta = min(cursize, nbytes - cursize)
+ try:
+ data += self.rfile.read(delta)
+ except TimeoutError:
+ break
else:
data = None
# throw away additional data [see bug #427345]
print("</pre>")
"""
+cgi_file7 = """\
+#!%s
+import os
+import sys
+
+print("Content-type: text/plain")
+print()
+
+content_length = int(os.environ["CONTENT_LENGTH"])
+body = sys.stdin.buffer.read(content_length)
+
+print(f"{content_length} {len(body)}")
+"""
+
@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
"This test can't be run reliably as root (issue #13308).")
self.file3_path = None
self.file4_path = None
self.file5_path = None
+ self.file6_path = None
+ self.file7_path = None
# The shebang line should be pure ASCII: use symlink if possible.
# See issue #7668.
file6.write(cgi_file6 % self.pythonexe)
os.chmod(self.file6_path, 0o777)
+ self.file7_path = os.path.join(self.cgi_dir, 'file7.py')
+ with open(self.file7_path, 'w', encoding='utf-8') as file7:
+ file7.write(cgi_file7 % self.pythonexe)
+ os.chmod(self.file7_path, 0o777)
+
os.chdir(self.parent_dir)
def tearDown(self):
os.remove(self.file5_path)
if self.file6_path:
os.remove(self.file6_path)
+ if self.file7_path:
+ os.remove(self.file7_path)
os.rmdir(self.cgi_child_dir)
os.rmdir(self.cgi_dir)
os.rmdir(self.cgi_dir_in_sub_dir)
self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
+ def test_large_content_length(self):
+ for w in range(15, 25):
+ size = 1 << w
+ body = b'X' * size
+ headers = {'Content-Length' : str(size)}
+ res = self.request('/cgi-bin/file7.py', 'POST', body, headers)
+ self.assertEqual(res.read(), b'%d %d' % (size, size) + self.linesep)
+
+ def test_large_content_length_truncated(self):
+ with support.swap_attr(self.request_handler, 'timeout', 0.001):
+ for w in range(18, 65):
+ size = 1 << w
+ headers = {'Content-Length' : str(size)}
+ res = self.request('/cgi-bin/file1.py', 'POST', b'x', headers)
+ self.assertEqual(res.read(), b'Hello World' + self.linesep)
+
def test_invaliduri(self):
res = self.request('/cgi-bin/invalid')
res.read()