oforms[name] = values[0]
elif ctype.startswith("multipart/"):
def on_field(field):
- oforms[field.field_name] = field.value
+ oforms[field.field_name.decode()] = field.value.decode()
def on_file(file):
- ofiles[field.field_name] = field.value
+ ofiles[file.field_name.decode()] = file.value
multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
return oforms, ofiles
#!/usr/bin/env python3
import os, sys
-from urllib import parse
-import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
-
-
-def get_request_params():
- oforms = {}
- ofiles = {}
- if "REQUEST_URI" in os.environ:
- qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- if "HTTP_CONTENT_TYPE" in os.environ:
- ctype = os.environ["HTTP_CONTENT_TYPE"]
- if ctype == "application/x-www-form-urlencoded":
- qforms = parse.parse_qs(parse.urlsplit(sys.stdin.read()).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- elif ctype.startswith("multipart/"):
- def on_field(field):
- oforms[field.field_name] = field.value
- def on_file(file):
- ofiles[field.field_name] = field.value
- multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
- return oforms, ofiles
+from requestparser import get_request_params
forms, files = get_request_params()
#!/usr/bin/env python3
import os, sys
-from urllib import parse
-import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
-
-
-def get_request_params():
- oforms = {}
- ofiles = {}
- if "REQUEST_URI" in os.environ:
- qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- if "HTTP_CONTENT_TYPE" in os.environ:
- ctype = os.environ["HTTP_CONTENT_TYPE"]
- if ctype == "application/x-www-form-urlencoded":
- qforms = parse.parse_qs(parse.urlsplit(sys.stdin.read()).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- elif ctype.startswith("multipart/"):
- def on_field(field):
- oforms[field.field_name] = field.value
- def on_file(file):
- ofiles[field.field_name] = field.value
- multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
- return oforms, ofiles
+from requestparser import get_request_params
forms, files = get_request_params()
#!/usr/bin/env python3
import os, sys
-from urllib import parse
-import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
-
-
-def get_request_params():
- oforms = {}
- ofiles = {}
- if "REQUEST_URI" in os.environ:
- qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- if "HTTP_CONTENT_TYPE" in os.environ:
- ctype = os.environ["HTTP_CONTENT_TYPE"]
- if ctype == "application/x-www-form-urlencoded":
- qforms = parse.parse_qs(parse.urlsplit(sys.stdin.read()).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- elif ctype.startswith("multipart/"):
- def on_field(field):
- oforms[field.field_name] = field.value
- def on_file(file):
- ofiles[field.field_name] = field.value
- multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
- return oforms, ofiles
+from requestparser import get_request_params
forms, files = get_request_params()
#!/usr/bin/env python3
import os, sys
-from urllib import parse
-import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
-
-
-def get_request_params():
- oforms = {}
- ofiles = {}
- if "REQUEST_URI" in os.environ:
- qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- if "HTTP_CONTENT_TYPE" in os.environ:
- ctype = os.environ["HTTP_CONTENT_TYPE"]
- if ctype == "application/x-www-form-urlencoded":
- qforms = parse.parse_qs(parse.urlsplit(sys.stdin.read()).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- elif ctype.startswith("multipart/"):
- def on_field(field):
- oforms[field.field_name] = field.value
- def on_file(file):
- ofiles[field.field_name] = field.value
- multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
- return oforms, ofiles
+from requestparser import get_request_params
forms, files = get_request_params()
#!/usr/bin/env python3
import time
import os, sys
-from urllib import parse
-import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
-
-
-def get_request_params():
- oforms = {}
- ofiles = {}
- if "REQUEST_URI" in os.environ:
- qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- if "HTTP_CONTENT_TYPE" in os.environ:
- ctype = os.environ["HTTP_CONTENT_TYPE"]
- if ctype == "application/x-www-form-urlencoded":
- qforms = parse.parse_qs(parse.urlsplit(sys.stdin.read()).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- elif ctype.startswith("multipart/"):
- def on_field(field):
- oforms[field.field_name] = field.value
- def on_file(file):
- ofiles[field.field_name] = field.value
- multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
- return oforms, ofiles
+from requestparser import get_request_params
forms, files = get_request_params()
<p>No count was specified: %s</p>
</body></html>""" % (count))
-except KeyError:
+except KeyError as ex:
print("Status: 200 Ok")
- print("""\
+ print(f"""\
Content-Type: text/html\n
- <html><body>
+ <html><body>uri: uri={os.environ['REQUEST_URI']} ct={os.environ['CONTENT_TYPE']} ex={ex}
+ forms={forms}
Echo <form method="POST" enctype="application/x-www-form-urlencoded">
<input type="text" name="count">
<input type="text" name="text">
--- /dev/null
+#!/usr/bin/env python3
+import os
+import sys
+from urllib import parse
+import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
+import shutil
+
+
+try: # Windows needs stdio set for binary mode.
+ import msvcrt
+
+ msvcrt.setmode(0, os.O_BINARY) # stdin = 0
+ msvcrt.setmode(1, os.O_BINARY) # stdout = 1
+except ImportError:
+ pass
+
+
+class FileItem:
+
+ def __init__(self, mparse_item):
+ self.item = mparse_item
+
+ @property
+ def file_name(self):
+ return os.path.basename(self.item.file_name.decode())
+
+ def save_to(self, destpath: str):
+ fsrc = self.item.file_object
+ fsrc.seek(0)
+ with open(destpath, 'wb') as fd:
+ shutil.copyfileobj(fsrc, fd)
+
+
+def get_request_params():
+ oforms = {}
+ ofiles = {}
+ if "REQUEST_URI" in os.environ:
+ qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
+ for name, values in qforms.items():
+ oforms[name] = values[0]
+ if "CONTENT_TYPE" in os.environ:
+ ctype = os.environ["CONTENT_TYPE"]
+ if ctype == "application/x-www-form-urlencoded":
+ s = sys.stdin.read()
+ qforms = parse.parse_qs(s)
+ for name, values in qforms.items():
+ oforms[name] = values[0]
+ elif ctype.startswith("multipart/"):
+ def on_field(field):
+ oforms[field.field_name.decode()] = field.value.decode()
+ def on_file(file):
+ ofiles[file.field_name.decode()] = FileItem(file)
+ multipart.parse_form(headers={"Content-Type": ctype},
+ input_stream=sys.stdin.buffer,
+ on_field=on_field, on_file=on_file)
+ return oforms, ofiles
+
#!/usr/bin/env python3
import os
import sys
-from urllib import parse
-import multipart # https://github.com/andrew-d/python-multipart (`apt install python3-multipart`)
-
-
-try: # Windows needs stdio set for binary mode.
- import msvcrt
-
- msvcrt.setmode(0, os.O_BINARY) # stdin = 0
- msvcrt.setmode(1, os.O_BINARY) # stdout = 1
-except ImportError:
- pass
-
-def get_request_params():
- oforms = {}
- ofiles = {}
- if "REQUEST_URI" in os.environ:
- qforms = parse.parse_qs(parse.urlsplit(os.environ["REQUEST_URI"]).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- if "HTTP_CONTENT_TYPE" in os.environ:
- ctype = os.environ["HTTP_CONTENT_TYPE"]
- if ctype == "application/x-www-form-urlencoded":
- qforms = parse.parse_qs(parse.urlsplit(sys.stdin.read()).query)
- for name, values in qforms.items():
- oforms[name] = values[0]
- elif ctype.startswith("multipart/"):
- def on_field(field):
- oforms[field.field_name] = field.value
- def on_file(file):
- ofiles[field.field_name] = field.value
- multipart.parse_form(headers={"Content-Type": ctype}, input_stream=sys.stdin.buffer, on_field=on_field, on_file=on_file)
- return oforms, ofiles
+from requestparser import get_request_params
forms, files = get_request_params()
if 'file' in files:
fitem = files['file']
# strip leading path from file name to avoid directory traversal attacks
- fname = fitem.filename
+ fname = os.path.basename(fitem.file_name)
fpath = f'{os.environ["DOCUMENT_ROOT"]}/files/{fname}'
- fitem.save_as(fpath)
+ fitem.save_to(fpath)
message = "The file %s was uploaded successfully" % (fname)
print("Status: 201 Created")
print("Content-Type: text/html")
@pytest.mark.parametrize("path", [
"/004.html", "/proxy/004.html", "/h2proxy/004.html"
])
- def test_h2_003_50(self, env, path):
+ def test_h2_003_50(self, env, path, repeat):
# check that the resource supports ranges and we see its raw content-length
url = env.mkurl("https", "test1", path)
- r = env.curl_get(url, 5)
+ # TODO: sometimes we see a 503 here from h2proxy
+ for i in range(10):
+ r = env.curl_get(url, 5)
+ if r.response["status"] != 503:
+ break
assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
h = r.response["header"]
r = env.nghttp().upload_file(url, fpath, options=options)
assert r.exit_code == 0
assert r.response["status"] >= 200 and r.response["status"] < 300
+ assert 'location' in r.response["header"], f'{r}'
assert r.response["header"]["location"]
r2 = env.nghttp().get(r.response["header"]["location"])
def test_h2_200_01(self, env):
url = env.mkurl("https", "cgi", "/hecho.py")
for x in range(1, 32):
- r = env.curl_post_data(url, "name=x%%%02xx&value=yz" % x)
+ data = f'name=x%{x:02x}x&value=yz'
+ r = env.curl_post_data(url, data)
if x in [13]:
- assert 0 == r.exit_code, "unexpected exit code for char 0x%02x" % x
- assert 200 == r.response["status"], "unexpected status for char 0x%02x" % x
+ assert 0 == r.exit_code, f'unexpected exit code for char 0x{x:02}'
+ assert 200 == r.response["status"], f'unexpected status for char 0x{x:02}'
else:
- assert 0 == r.exit_code, "unexpected exit code for char 0x%02x" % x
- assert 500 == r.response["status"], "unexpected status for char 0x%02x" % x
+ assert 0 == r.exit_code, f'"unexpected exit code for char 0x{x:02}'
+ assert 500 == r.response["status"], f'posting "{data}" unexpected status, {r}'
# let the hecho.py CGI echo chars < 0x20 in field value
# for almost all such characters, the stream returns a 500
def post_name(self, url, name, timeout=5, options=None):
reqbody = ("%s/nghttp.req.body" % self.TMP_DIR)
with open(reqbody, 'w') as f:
- f.write("--DSAJKcd9876\n")
- f.write("Content-Disposition: form-data; name=\"value\"; filename=\"xxxxx\"\n")
- f.write("Content-Type: text/plain\n")
- f.write("\n%s\n" % name)
- f.write("--DSAJKcd9876\n")
+ f.write("--DSAJKcd9876\r\n")
+ f.write("Content-Disposition: form-data; name=\"value\"; filename=\"xxxxx\"\r\n")
+ f.write("Content-Type: text/plain\r\n")
+ f.write(f"\r\n{name}")
+ f.write("\r\n--DSAJKcd9876\r\n")
if not options:
options = []
options.extend([
reqbody = ("%s/nghttp.req.body" % self.TMP_DIR)
with open(fpath, 'rb') as fin:
with open(reqbody, 'wb') as f:
- f.write(("""--DSAJKcd9876
-Content-Disposition: form-data; name="xxx"; filename="xxxxx"
-Content-Type: text/plain
-
-testing mod_h2
---DSAJKcd9876
-Content-Disposition: form-data; name="file"; filename="%s"
-Content-Type: application/octet-stream
-Content-Transfer-Encoding: binary
-
-""" % fname).encode('utf-8'))
+ preamble = [
+ '--DSAJKcd9876',
+ 'Content-Disposition: form-data; name="xxx"; filename="xxxxx"',
+ 'Content-Type: text/plain',
+ '',
+ 'testing mod_h2',
+ '\r\n--DSAJKcd9876',
+ f'Content-Disposition: form-data; name="file"; filename="{fname}"',
+ 'Content-Type: application/octet-stream',
+ 'Content-Transfer-Encoding: binary',
+ '', ''
+ ]
+ f.write('\r\n'.join(preamble).encode('utf-8'))
f.write(fin.read())
- f.write("""
---DSAJKcd9876""".encode('utf-8'))
+ f.write('\r\n'.join([
+ '\r\n--DSAJKcd9876', ''
+ ]).encode('utf-8'))
if not options:
options = []
options.extend([
self._json_out = None
def __repr__(self):
- return f"ExecResult[code={self.exit_code}, args={self._args}, stdout={self._stdout}, stderr={self._stderr}]"
+ out = [
+ f"ExecResult[code={self.exit_code}, args={self._args}\n",
+ "----stdout---------------------------------------\n",
+ self._stdout.decode(),
+ "----stderr---------------------------------------\n",
+ self._stderr.decode()
+ ]
+ return ''.join(out)
@property
def exit_code(self) -> int: