- Updated Tests.
- Don't change signature of public methods.
.. automethod:: RequestHandler.get_argument
.. automethod:: RequestHandler.get_arguments
+ .. automethod:: RequestHandler.get_query_argument
+ .. automethod:: RequestHandler.get_query_arguments
+ .. automethod:: RequestHandler.get_body_argument
+ .. automethod:: RequestHandler.get_body_arguments
.. automethod:: RequestHandler.decode_argument
.. attribute:: RequestHandler.request
if self._request.method in ("POST", "PATCH", "PUT"):
httputil.parse_body_arguments(
self._request.headers.get("Content-Type", ""), data,
- self._request.arguments, self._request.body_arguments, self._request.files)
+ self._request.body_arguments, self._request.files)
+
+ for k, v in self._request.body_arguments.iteritems():
+ self._request.arguments.setdefault(k, []).extend(v)
self.request_callback(self._request)
return int(val)
-def parse_body_arguments(content_type, body, arguments, body_arguments, files):
+def parse_body_arguments(content_type, body, arguments, files):
"""Parses a form request body.
Supports ``application/x-www-form-urlencoded`` and
for name, values in uri_arguments.items():
if values:
arguments.setdefault(name, []).extend(values)
- body_arguments.setdefault(name, []).extend(values)
elif content_type.startswith("multipart/form-data"):
fields = content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
- parse_multipart_form_data(utf8(v), body, arguments, body_arguments, files)
+ parse_multipart_form_data(utf8(v), body, arguments, files)
break
else:
gen_log.warning("Invalid multipart/form-data")
-def parse_multipart_form_data(boundary, data, arguments, body_arguments, files):
+def parse_multipart_form_data(boundary, data, arguments, files):
"""Parses a ``multipart/form-data`` body.
The ``boundary`` and ``data`` parameters are both byte strings.
content_type=ctype))
else:
arguments.setdefault(name, []).append(value)
- body_arguments.setdefault(name, []).append(value)
def format_timestamp(ts):
Foo
--1234--""".replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
self.assertEqual(file["body"], b"Foo")
Foo
--1234--""".replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
self.assertEqual(file["body"], b"Foo")
--1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"')
data = utf8(data.replace("\n", "\r\n"))
args = {}
- body_args = {}
files = {}
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], filename)
self.assertEqual(file["body"], b"Foo")
Foo
--1234--'''.replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
- parse_multipart_form_data(b'"1234"', data, args, body_args, files)
+ parse_multipart_form_data(b'"1234"', data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
self.assertEqual(file["body"], b"Foo")
Foo
--1234--'''.replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
with ExpectLog(gen_log, "multipart/form-data missing headers"):
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_invalid_content_disposition(self):
Foo
--1234--'''.replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
with ExpectLog(gen_log, "Invalid multipart/form-data"):
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_line_does_not_end_with_correct_line_break(self):
Foo--1234--'''.replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
with ExpectLog(gen_log, "Invalid multipart/form-data"):
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_content_disposition_header_without_name_parameter(self):
Foo
--1234--""".replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
with ExpectLog(gen_log, "multipart/form-data value missing name"):
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
self.assertEqual(files, {})
def test_data_after_final_boundary(self):
--1234--
""".replace(b"\n", b"\r\n")
args = {}
- body_args = {}
files = {}
- parse_multipart_form_data(b"1234", data, args, body_args, files)
+ parse_multipart_form_data(b"1234", data, args, files)
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
self.assertEqual(file["body"], b"Foo")
import socket
import sys
+try:
+ import urllib.parse as urllib_parse # py3
+except ImportError:
+ import urllib as urllib_parse # py2
+
wsgi_safe_tests = []
relpath = lambda *a: os.path.join(os.path.dirname(__file__), *a)
if type(value) != bytes_type:
raise Exception("incorrect type for value: %r" %
type(value))
- for value in self.get_arguments(key, self.request.arguments):
+ for value in self.get_arguments(key):
if type(value) != unicode_type:
raise Exception("incorrect type for value: %r" %
type(value))
def get(self):
self.write(self.get_argument("foo", "default"))
+ def post(self):
+ self.write(self.get_argument("foo", "default"))
+
+
+class GetQueryArgumentHandler(RequestHandler):
+ def post(self):
+ self.write(self.get_query_argument("foo", "default"))
+
+
+class GetBodyArgumentHandler(RequestHandler):
+ def post(self):
+ self.write(self.get_body_argument("foo", "default"))
+
# This test is shared with wsgi_test.py
@wsgi_safe
url("/redirect", RedirectHandler),
url("/header_injection", HeaderInjectionHandler),
url("/get_argument", GetArgumentHandler),
+ url("/get_query_argument", GetQueryArgumentHandler),
+ url("/get_body_argument", GetBodyArgumentHandler),
]
return urls
response = self.fetch("/get_argument")
self.assertEqual(response.body, b"default")
+ # test merging of query and body arguments
+ # body arguments overwrite query arguments
+ body = urllib_parse.urlencode(dict(foo="hello"))
+ response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
+ self.assertEqual(response.body, b"hello")
+
+ def test_get_query_arguments(self):
+ # send as a post so we can ensure the separation between query
+ # string and body arguments.
+ body = urllib_parse.urlencode(dict(foo="hello"))
+ response = self.fetch("/get_query_argument?foo=bar", method="POST", body=body)
+ self.assertEqual(response.body, b"bar")
+ response = self.fetch("/get_query_argument?foo=", method="POST", body=body)
+ self.assertEqual(response.body, b"")
+ response = self.fetch("/get_query_argument", method="POST", body=body)
+ self.assertEqual(response.body, b"default")
+
+ def test_get_body_arguments(self):
+ body = urllib_parse.urlencode(dict(foo="bar"))
+ response = self.fetch("/get_body_argument?foo=hello", method="POST", body=body)
+ self.assertEqual(response.body, b"bar")
+
+ body = urllib_parse.urlencode(dict(foo=""))
+ response = self.fetch("/get_body_argument?foo=hello", method="POST", body=body)
+ self.assertEqual(response.body, b"")
+
+ body = urllib_parse.urlencode(dict())
+ response = self.fetch("/get_body_argument?foo=hello", method="POST", body=body)
+ self.assertEqual(response.body, b"default")
+
def test_no_gzip(self):
response = self.fetch('/get_argument')
self.assertNotIn('Accept-Encoding', response.headers.get('Vary', ''))
The returned value is always unicode.
"""
- return self._get_argument(name, self.request.arguments, default, strip)
+ return self._get_argument(name, default, self.request.arguments, strip)
+
+ def get_arguments(self, name, strip=True):
+ """Returns a list of the arguments with the given name.
+
+ If the argument is not present, returns an empty list.
+
+ The returned values are always unicode.
+ """
+ return self._get_arguments(name, self.request.arguments, strip)
def get_body_argument(self, name, default=_ARG_DEFAULT, strip=True):
"""Returns the value of the argument with the given name
The returned value is always unicode.
"""
- return self._get_argument(name, self.request.body_arguments, default, strip)
+ return self._get_argument(name, default, self.request.body_arguments, strip)
+
+ def get_body_arguments(self, name, strip=True):
+ """Returns a list of the body arguments with the given name.
+
+ If the argument is not present, returns an empty list.
+
+ The returned values are always unicode.
+ """
+ return self._get_arguments(name, self.request.body_arguments, strip)
def get_query_argument(self, name, default=_ARG_DEFAULT, strip=True):
"""Returns the value of the argument with the given name
The returned value is always unicode.
"""
- return self._get_argument(name, self.request.query_arguments, default, strip)
+ return self._get_argument(name, default, self.request.query_arguments, strip)
- def _get_argument(self, name, source, default=_ARG_DEFAULT, strip=True):
- args = self.get_arguments(name, source, strip=strip)
- if not args:
- if default is self._ARG_DEFAULT:
- raise MissingArgumentError(name)
- return default
- return args[-1]
-
- def get_arguments(self, name, source, strip=True):
- """Returns a list of the arguments with the given name.
+ def get_query_arguments(self, name, strip=True):
+ """Returns a list of the query arguments with the given name.
If the argument is not present, returns an empty list.
The returned values are always unicode.
"""
+ return self._get_arguments(name, self.request.query_arguments, strip)
+
+ def _get_argument(self, name, default, source, strip=True):
+ args = self._get_arguments(name, source, strip=strip)
+ if not args:
+ if default is self._ARG_DEFAULT:
+ raise MissingArgumentError(name)
+ return default
+ return args[-1]
+ def _get_arguments(self, name, source, strip=True):
values = []
for v in source.get(name, []):
v = self.decode_argument(v, name=name)
# Parse request body
self.files = {}
httputil.parse_body_arguments(self.headers.get("Content-Type", ""),
- self.body, self.arguments, self.body_arguments, self.files)
+ self.body, self.arguments, self.files)
self._start_time = time.time()
self._finish_time = None