from tornado import gen
from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring
from tornado.httputil import format_timestamp
+from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
from tornado import locale
from tornado.log import app_log, gen_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.template import DictLoader
from tornado.testing import AsyncHTTPTestCase, ExpectLog, gen_test
-from tornado.test.util import unittest
+from tornado.test.util import unittest, skipBefore35, exec_test
from tornado.util import u, ObjectDict, unicode_type, timedelta_to_seconds
from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError, stream_request_body, Finish, removeslash, addslash, RedirectHandler as WebRedirectHandler, get_signature_key_version, GZipContentEncoding
import copy
import datetime
import email.utils
+import gzip
+from io import BytesIO
import itertools
import logging
import os
yield self.close_future
-class StreamingRequestFlowControlTest(WebTestCase):
- def get_handlers(self):
- from tornado.ioloop import IOLoop
-
- # Each method in this handler returns a Future and yields to the
- # IOLoop so the future is not immediately ready. Ensure that the
- # Futures are respected and no method is called before the previous
- # one has completed.
- @stream_request_body
- class FlowControlHandler(RequestHandler):
- def initialize(self, test):
- self.test = test
- self.method = None
- self.methods = []
-
- @contextlib.contextmanager
- def in_method(self, method):
- if self.method is not None:
- self.test.fail("entered method %s while in %s" %
- (method, self.method))
- self.method = method
- self.methods.append(method)
- try:
- yield
- finally:
- self.method = None
-
- @gen.coroutine
- def prepare(self):
- # Note that asynchronous prepare() does not block data_received,
- # so we don't use in_method here.
- self.methods.append('prepare')
- yield gen.Task(IOLoop.current().add_callback)
+# Each method in this handler returns a yieldable object and yields to the
+# IOLoop so the future is not immediately ready. Ensure that the
+# yieldables are respected and no method is called before the previous
+# one has completed.
+@stream_request_body
+class BaseFlowControlHandler(RequestHandler):
+ def initialize(self, test):
+ self.test = test
+ self.method = None
+ self.methods = []
+
+ @contextlib.contextmanager
+ def in_method(self, method):
+ if self.method is not None:
+ self.test.fail("entered method %s while in %s" %
+ (method, self.method))
+ self.method = method
+ self.methods.append(method)
+ try:
+ yield
+ finally:
+ self.method = None
- @gen.coroutine
- def data_received(self, data):
- with self.in_method('data_received'):
- yield gen.Task(IOLoop.current().add_callback)
+ @gen.coroutine
+ def prepare(self):
+ # Note that asynchronous prepare() does not block data_received,
+ # so we don't use in_method here.
+ self.methods.append('prepare')
+ yield gen.Task(IOLoop.current().add_callback)
- @gen.coroutine
- def post(self):
- with self.in_method('post'):
- yield gen.Task(IOLoop.current().add_callback)
- self.write(dict(methods=self.methods))
+ @gen.coroutine
+ def post(self):
+ with self.in_method('post'):
+ yield gen.Task(IOLoop.current().add_callback)
+ self.write(dict(methods=self.methods))
- return [('/', FlowControlHandler, dict(test=self))]
+class BaseStreamingRequestFlowControlTest(object):
def get_httpserver_options(self):
# Use a small chunk size so flow control is relevant even though
# all the data arrives at once.
- return dict(chunk_size=10)
+ return dict(chunk_size=10, decompress_request=True)
- def test_flow_control(self):
+ def get_http_client(self):
+ # simple_httpclient only: curl doesn't support body_producer.
+ return SimpleAsyncHTTPClient(io_loop=self.io_loop)
+
+ # Test all the slightly different code paths for fixed, chunked, etc bodies.
+ def test_flow_control_fixed_body(self):
response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
method='POST')
response.rethrow()
'data_received', 'data_received',
'post']))
+ def test_flow_control_chunked_body(self):
+ chunks = [b'abcd', b'efgh', b'ijkl']
+ @gen.coroutine
+ def body_producer(write):
+ for i in chunks:
+ yield write(i)
+ response = self.fetch('/', body_producer=body_producer, method='POST')
+ response.rethrow()
+ self.assertEqual(json_decode(response.body),
+ dict(methods=['prepare', 'data_received',
+ 'data_received', 'data_received',
+ 'post']))
+
+ def test_flow_control_compressed_body(self):
+ bytesio = BytesIO()
+ gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
+ gzip_file.write(b'abcdefghijklmnopqrstuvwxyz')
+ gzip_file.close()
+ compressed_body = bytesio.getvalue()
+ response = self.fetch('/', body=compressed_body, method='POST',
+ headers={'Content-Encoding': 'gzip'})
+ response.rethrow()
+ self.assertEqual(json_decode(response.body),
+ dict(methods=['prepare', 'data_received',
+ 'data_received', 'data_received',
+ 'post']))
+
+class DecoratedStreamingRequestFlowControlTest(
+ BaseStreamingRequestFlowControlTest,
+ WebTestCase):
+ def get_handlers(self):
+ class DecoratedFlowControlHandler(BaseFlowControlHandler):
+ @gen.coroutine
+ def data_received(self, data):
+ with self.in_method('data_received'):
+ yield gen.Task(IOLoop.current().add_callback)
+ return [('/', DecoratedFlowControlHandler, dict(test=self))]
+
+
+@skipBefore35
+class NativeStreamingRequestFlowControlTest(
+ BaseStreamingRequestFlowControlTest,
+ WebTestCase):
+ def get_handlers(self):
+ class NativeFlowControlHandler(BaseFlowControlHandler):
+ data_received = exec_test(globals(), locals(), """
+ async def data_received(self, data):
+ with self.in_method('data_received'):
+ await gen.Task(IOLoop.current().add_callback)
+ """)["data_received"]
+ return [('/', NativeFlowControlHandler, dict(test=self))]
+
@wsgi_safe
class IncorrectContentLengthTest(SimpleHandlerTestCase):