logging.warning("multipart/form-data missing headers")
continue
headers = HTTPHeaders.parse(part[:eoh].decode("utf-8"))
- name_header = headers.get("Content-Disposition", "")
- if not name_header.startswith("form-data;") or \
- not part.endswith(b("\r\n")):
+ disp_header = headers.get("Content-Disposition", "")
+ disposition, disp_params = _parse_header(disp_header)
+ if disposition != "form-data" or not part.endswith(b("\r\n")):
logging.warning("Invalid multipart/form-data")
continue
value = part[eoh + 4:-2]
- name_values = {}
- for name_part in name_header[10:].split(";"):
- name, name_value = name_part.strip().split("=", 1)
- name_values[name] = name_value.strip('"')
- if not name_values.get("name"):
+ if not disp_params.get("name"):
logging.warning("multipart/form-data value missing name")
continue
- name = name_values["name"]
- if name_values.get("filename"):
+ name = disp_params["name"]
+ if disp_params.get("filename"):
ctype = headers.get("Content-Type", "application/unknown")
files.setdefault(name, []).append(dict(
- filename=name_values["filename"], body=value,
+ filename=disp_params["filename"], body=value,
content_type=ctype))
else:
arguments.setdefault(name, []).append(value)
+# _parseparam and _parse_header are copied and modified from python2.7's cgi.py
+# The original 2.7 version of this code did not correctly support some
+# combinations of semicolons and double quotes.
+def _parseparam(s):
+ while s[:1] == ';':
+ s = s[1:]
+ end = s.find(';')
+ while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
+ end = s.find(';', end + 1)
+ if end < 0:
+ end = len(s)
+ f = s[:end]
+ yield f.strip()
+ s = s[end:]
+
+def _parse_header(line):
+ """Parse a Content-type like header.
+
+ Return the main content-type and a dictionary of options.
+
+ """
+ parts = _parseparam(';' + line)
+ key = parts.next()
+ pdict = {}
+ for p in parts:
+ i = p.find('=')
+ if i >= 0:
+ name = p[:i].strip().lower()
+ value = p[i+1:].strip()
+ if len(value) >= 2 and value[0] == value[-1] == '"':
+ value = value[1:-1]
+ value = value.replace('\\\\', '\\').replace('\\"', '"')
+ pdict[name] = value
+ return key, pdict
+
+
def doctests():
import doctest
return doctest.DocTestSuite()
#!/usr/bin/env python
-from tornado.httputil import url_concat
+from tornado.httputil import url_concat, parse_multipart_form_data
+from tornado.escape import utf8
+from tornado.testing import LogTrapTestCase
+from tornado.util import b
+import logging
import unittest
[],
)
self.assertEqual(url, "https://localhost/path?r=1&t=2")
+
+class MultipartFormDataTest(LogTrapTestCase):
+ def test_file_upload(self):
+ data = b("""\
+--1234
+Content-Disposition: form-data; name="files"; filename="ab.txt"
+
+Foo
+--1234--""").replace(b("\n"), b("\r\n"))
+ args = {}
+ files = {}
+ parse_multipart_form_data(b("1234"), data, args, files)
+ file = files["files"][0]
+ self.assertEqual(file["filename"], "ab.txt")
+ self.assertEqual(file["body"], b("Foo"))
+
+ def test_unquoted_names(self):
+ # quotes are optional unless special characters are present
+ data = b("""\
+--1234
+Content-Disposition: form-data; name=files; filename=ab.txt
+
+Foo
+--1234--""").replace(b("\n"), b("\r\n"))
+ args = {}
+ files = {}
+ parse_multipart_form_data(b("1234"), data, args, files)
+ file = files["files"][0]
+ self.assertEqual(file["filename"], "ab.txt")
+ self.assertEqual(file["body"], b("Foo"))
+
+ def test_special_filenames(self):
+ filenames = ['a;b.txt',
+ 'a"b.txt',
+ 'a";b.txt',
+ 'a;"b.txt',
+ 'a";";.txt',
+ 'a\\"b.txt',
+ 'a\\b.txt',
+ ]
+ for filename in filenames:
+ logging.info("trying filename %r", filename)
+ data = """\
+--1234
+Content-Disposition: form-data; name="files"; filename="%s"
+
+Foo
+--1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"')
+ data = utf8(data.replace("\n", "\r\n"))
+ args = {}
+ files = {}
+ parse_multipart_form_data(b("1234"), data, args, files)
+ file = files["files"][0]
+ self.assertEqual(file["filename"], filename)
+ self.assertEqual(file["body"], b("Foo"))