import time
import urlparse
+from tornado.escape import utf8
from tornado import httputil
from tornado import ioloop
from tornado import iostream
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
- self._parse_mime_body(v, data)
+ self._parse_mime_body(utf8(v), data)
break
else:
logging.warning("Invalid multipart/form-data")
# xmpp). I think we're also supposed to handle backslash-escapes
# here but I'll save that until we see a client that uses them
# in the wild.
- if boundary.startswith('"') and boundary.endswith('"'):
+ if boundary.startswith(b('"')) and boundary.endswith(b('"')):
boundary = boundary[1:-1]
- if data.endswith("\r\n"):
+ if data.endswith(b("\r\n")):
footer_length = len(boundary) + 6
else:
footer_length = len(boundary) + 4
- parts = data[:-footer_length].split("--" + boundary + "\r\n")
+ parts = data[:-footer_length].split(b("--") + boundary + b("\r\n"))
for part in parts:
if not part: continue
- eoh = part.find("\r\n\r\n")
+ eoh = part.find(b("\r\n\r\n"))
if eoh == -1:
logging.warning("multipart/form-data missing headers")
continue
- headers = httputil.HTTPHeaders.parse(part[:eoh])
+ headers = httputil.HTTPHeaders.parse(part[:eoh].decode("latin1"))
name_header = headers.get("Content-Disposition", "")
if not name_header.startswith("form-data;") or \
- not part.endswith("\r\n"):
+ not part.endswith(b("\r\n")):
logging.warning("Invalid multipart/form-data")
continue
value = part[eoh + 4:-2]
name_values = {}
for name_part in name_header[10:].split(";"):
name, name_value = name_part.strip().split("=", 1)
- name_values[name] = name_value.strip('"').decode("utf-8")
+ name_values[name] = name_value.strip('"')
if not name_values.get("name"):
logging.warning("multipart/form-data value missing name")
continue
- name = name_values["name"].decode("utf-8")
+ name = name_values["name"]
if name_values.get("filename"):
ctype = headers.get("Content-Type", "application/unknown")
self._request.files.setdefault(name, []).append(dict(
#!/usr/bin/env python
+from tornado import httpclient, simple_httpclient
+from tornado.escape import json_decode, utf8, _unicode
+from tornado.iostream import IOStream
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.testing import AsyncHTTPTestCase, LogTrapTestCase
from tornado.util import b
from tornado.web import Application, RequestHandler
+import logging
import os
import re
+import socket
import unittest
import urllib
if ssl is None:
del SSLTest
+
+class MultipartTestHandler(RequestHandler):
+ def post(self):
+ self.finish({"header": self.request.headers["X-Header-Encoding-Test"],
+ "argument": self.get_argument("argument"),
+ "filename": self.request.files["files"][0]["filename"],
+ "filebody": _unicode(self.request.files["files"][0]["body"]),
+ })
+
+class RawRequestHTTPConnection(simple_httpclient._HTTPConnection):
+ def set_request(self, request):
+ self.__next_request = request
+
+ def _on_connect(self, parsed):
+ self.stream.write(self.__next_request)
+ self.__next_request = None
+ self.stream.read_until(b("\r\n\r\n"), self._on_headers)
+
+class HTTPConnectionTest(AsyncHTTPTestCase, LogTrapTestCase):
+ def get_app(self):
+ return Application([("/multipart", MultipartTestHandler)])
+
+ def raw_fetch(self, headers, body):
+ conn = RawRequestHTTPConnection(self.io_loop, self.http_client,
+ httpclient.HTTPRequest(self.get_url("/")),
+ self.stop)
+ conn.set_request(
+ b("\r\n").join(headers +
+ [utf8("Content-Length: %d\r\n" % len(body))]) +
+ b("\r\n") + body)
+ response = self.wait()
+ response.rethrow()
+ return response
+
+ def test_multipart_form(self):
+ # Encodings here are tricky: Headers are latin1, bodies can be
+ # anything (we use utf8 by default).
+ response = self.raw_fetch([
+ b("POST /multipart HTTP/1.0"),
+ b("Content-Type: multipart/form-data; boundary=1234567890"),
+ u"X-Header-encoding-test: \u00e9".encode("latin1"),
+ ],
+ b("\r\n").join([
+ b("Content-Disposition: form-data; name=argument"),
+ b(""),
+ u"\u00e1".encode("utf-8"),
+ b("--1234567890"),
+ u'Content-Disposition: form-data; name="files"; filename="\u00f3"'.encode("latin1"),
+ b(""),
+ u"\u00fa".encode("utf-8"),
+ b("--1234567890"),
+ b(""),
+ b(""),
+ ]))
+ data = json_decode(response.body)
+ self.assertEqual(u"\u00e9", data["header"])
+ self.assertEqual(u"\u00e1", data["argument"])
+ self.assertEqual(u"\u00f3", data["filename"])
+ self.assertEqual(u"\u00fa", data["filebody"])
+