from __future__ import absolute_import, division, print_function
+import asyncio
+
from concurrent.futures import ThreadPoolExecutor
from tornado import gen
from tornado.ioloop import IOLoop
+from tornado.platform.asyncio import AsyncIOLoop, to_asyncio_future, AnyThreadEventLoopPolicy
from tornado.testing import AsyncTestCase, gen_test
-from tornado.test.util import unittest, skipBefore33, skipBefore35, exec_test
-
-try:
- from tornado.platform.asyncio import asyncio
-except ImportError:
- asyncio = None
-else:
- from tornado.platform.asyncio import AsyncIOLoop, to_asyncio_future, AnyThreadEventLoopPolicy
- # This is used in dynamically-evaluated code, so silence pyflakes.
- to_asyncio_future
+from tornado.test.util import unittest
-@unittest.skipIf(asyncio is None, "asyncio module not present")
class AsyncIOLoopTest(AsyncTestCase):
def get_new_ioloop(self):
io_loop = AsyncIOLoop()
asyncio.get_event_loop().run_in_executor(None, lambda: 42))
self.assertEqual(x, 42)
- @skipBefore33
@gen_test
def test_asyncio_yield_from(self):
- # Test that we can use asyncio coroutines with 'yield from'
- # instead of asyncio.async(). This requires python 3.3 syntax.
- namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
event_loop = asyncio.get_event_loop()
x = yield from event_loop.run_in_executor(None, lambda: 42)
return x
- """)
- result = yield namespace['f']()
+ result = yield f()
self.assertEqual(result, 42)
- @skipBefore35
def test_asyncio_adapter(self):
# This test demonstrates that when using the asyncio coroutine
# runner (i.e. run_until_complete), the to_asyncio_future
def tornado_coroutine():
yield gen.moment
raise gen.Return(42)
- native_coroutine_without_adapter = exec_test(globals(), locals(), """
+
async def native_coroutine_without_adapter():
return await tornado_coroutine()
- """)["native_coroutine_without_adapter"]
- native_coroutine_with_adapter = exec_test(globals(), locals(), """
async def native_coroutine_with_adapter():
return await to_asyncio_future(tornado_coroutine())
- """)["native_coroutine_with_adapter"]
# Use the adapter, but two degrees from the tornado coroutine.
- native_coroutine_with_adapter2 = exec_test(globals(), locals(), """
async def native_coroutine_with_adapter2():
return await to_asyncio_future(native_coroutine_without_adapter())
- """)["native_coroutine_with_adapter2"]
# Tornado supports native coroutines both with and without adapters
self.assertEqual(
42)
-@unittest.skipIf(asyncio is None, "asyncio module not present")
class LeakTest(unittest.TestCase):
def setUp(self):
# Trigger a cleanup of the mapping so we start with a clean slate.
self.assertEqual(new_count, 1)
-@unittest.skipIf(asyncio is None, "asyncio module not present")
class AnyThreadEventLoopPolicyTest(unittest.TestCase):
def setUp(self):
self.orig_policy = asyncio.get_event_loop_policy()
# under the License.
from __future__ import absolute_import, division, print_function
+from concurrent import futures
import logging
import re
import socket
from tornado.iostream import IOStream
from tornado.tcpserver import TCPServer
from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
-from tornado.test.util import unittest, skipBefore35, exec_test
-
-
-try:
- from concurrent import futures
-except ImportError:
- futures = None
+from tornado.test.util import unittest
class MiscFutureTest(AsyncTestCase):
client_class = GeneratorCapClient
-@unittest.skipIf(futures is None, "concurrent.futures module not present")
class RunOnExecutorTest(AsyncTestCase):
@gen_test
def test_no_calling(self):
answer = yield o.f()
self.assertEqual(answer, 42)
- @skipBefore35
@gen_test
def test_async_await(self):
class Object(object):
return 42
o = Object()
- namespace = exec_test(globals(), locals(), """
+
async def f():
answer = await o.f()
return answer
- """)
- result = yield namespace['f']()
+ result = yield f()
self.assertEqual(result, 42)
import datetime
import platform
import sys
-import textwrap
import time
import weakref
from tornado.concurrent import Future
-from tornado.ioloop import IOLoop
from tornado.log import app_log
from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
-from tornado.test.util import unittest, skipOnTravis, skipBefore33, skipBefore35, skipNotCPython, exec_test # noqa: E501
+from tornado.test.util import unittest, skipOnTravis, skipNotCPython
from tornado.web import Application, RequestHandler, HTTPError
from tornado import gen
self.assertEqual(result, 42)
self.finished = True
- @skipBefore33
@gen_test
def test_async_return(self):
- namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
yield gen.moment
return 42
- """)
- result = yield namespace['f']()
+ result = yield f()
self.assertEqual(result, 42)
self.finished = True
- @skipBefore33
@gen_test
def test_async_early_return(self):
# A yield statement exists but is not executed, which means
# this function "returns" via an exception. This exception
# doesn't happen before the exception handling is set up.
- namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
if True:
return 42
yield gen.Task(self.io_loop.add_callback)
- """)
- result = yield namespace['f']()
+ result = yield f()
self.assertEqual(result, 42)
self.finished = True
- @skipBefore35
@gen_test
def test_async_await(self):
@gen.coroutine
# This test verifies that an async function can await a
# yield-based gen.coroutine, and that a gen.coroutine
# (the test method itself) can yield an async function.
- namespace = exec_test(globals(), locals(), """
async def f2():
result = await f1()
return result
- """)
- result = yield namespace['f2']()
+ result = yield f2()
self.assertEqual(result, 42)
self.finished = True
- @skipBefore35
@gen_test
def test_asyncio_sleep_zero(self):
# asyncio.sleep(0) turns into a special case (equivalent to
# `yield None`)
- namespace = exec_test(globals(), locals(), """
async def f():
import asyncio
await asyncio.sleep(0)
return 42
- """)
- result = yield namespace['f']()
+ result = yield f()
self.assertEqual(result, 42)
self.finished = True
- @skipBefore35
@gen_test
def test_async_await_mixed_multi_native_future(self):
@gen.coroutine
def f1():
yield gen.moment
- namespace = exec_test(globals(), locals(), """
async def f2():
await f1()
return 42
- """)
@gen.coroutine
def f3():
yield gen.moment
raise gen.Return(43)
- results = yield [namespace['f2'](), f3()]
+ results = yield [f2(), f3()]
self.assertEqual(results, [42, 43])
self.finished = True
- @skipBefore35
@gen_test
def test_async_with_timeout(self):
- namespace = exec_test(globals(), locals(), """
async def f1():
return 42
- """)
- result = yield gen.with_timeout(datetime.timedelta(hours=1),
- namespace['f1']())
+ result = yield gen.with_timeout(datetime.timedelta(hours=1), f1())
self.assertEqual(result, 42)
self.finished = True
yield gen.sleep(0.01)
self.finished = True
- @skipBefore33
@gen_test
def test_py3_leak_exception_context(self):
class LeakedException(Exception):
self.assertIs(self.local_ref(), None)
self.finished = True
- @unittest.skipIf(sys.version_info < (3,),
- "test only relevant with asyncio Futures")
def test_asyncio_future_debug_info(self):
self.finished = True
# Enable debug mode
actual = repr(coro)
self.assertIn(expected, actual)
- @unittest.skipIf(asyncio is None, "asyncio module not present")
@gen_test
def test_asyncio_gather(self):
# This demonstrates that tornado coroutines can be understood
class NativeCoroutineHandler(RequestHandler):
- if sys.version_info > (3, 5):
- exec(textwrap.dedent("""
- async def get(self):
- import asyncio
- await asyncio.sleep(0)
- self.write("ok")
- """))
+ async def get(self):
+ await asyncio.sleep(0)
+ self.write("ok")
class GenWebTest(AsyncHTTPTestCase):
response = self.fetch('/async_prepare_error')
self.assertEqual(response.code, 403)
- @skipBefore35
def test_native_coroutine_handler(self):
response = self.fetch('/native_coroutine')
self.assertEqual(response.code, 200)
future)
self.assertEqual(result, 'asdf')
- @unittest.skipIf(futures is None, 'futures module not present')
@gen_test
def test_timeout_concurrent_future(self):
# A concurrent future that does not resolve before the timeout.
yield gen.with_timeout(self.io_loop.time(),
executor.submit(time.sleep, 0.1))
- @unittest.skipIf(futures is None, 'futures module not present')
@gen_test
def test_completed_concurrent_future(self):
# A concurrent future that is resolved before we even submit it
f.result() # wait for completion
yield gen.with_timeout(datetime.timedelta(seconds=3600), f)
- @unittest.skipIf(futures is None, 'futures module not present')
@gen_test
def test_normal_concurrent_future(self):
# A conccurrent future that resolves while waiting for the timeout.
self.assertEqual(g.current_index, 3, 'wrong index')
i += 1
- @skipBefore35
@gen_test
def test_iterator_async_await(self):
# Recreate the previous test with py35 syntax. It's a little clunky
self.finish_coroutines(0, futures)
self.finished = False
- namespace = exec_test(globals(), locals(), """
async def f():
i = 0
g = gen.WaitIterator(*futures)
raise Exception("didn't expect iteration %d" % i)
i += 1
self.finished = True
- """)
- yield namespace['f']()
+ yield f()
self.assertTrue(self.finished)
@gen_test
# Github issue 2229: suspended coroutines should be GCed when
# their loop is closed, even if they're involved in a reference
# cycle.
- if IOLoop.configured_class().__name__.endswith('TwistedIOLoop'):
- raise unittest.SkipTest("Test may fail on TwistedIOLoop")
-
loop = self.get_new_ioloop()
result = []
wfut = []
# coroutine finalizer was called (not on PyPy3 apparently)
self.assertIs(result[-1], None)
- @skipBefore35
def test_gc_infinite_async_await(self):
# Same as test_gc_infinite_coro, but with a `async def` function
import asyncio
- namespace = exec_test(globals(), locals(), """
async def infinite_coro(result):
try:
while True:
finally:
# coroutine finalizer
result.append(None)
- """)
- infinite_coro = namespace['infinite_coro']
loop = self.get_new_ioloop()
result = []
wfut = []
class SyncHTTPClientTest(unittest.TestCase):
def setUp(self):
- if IOLoop.configured_class().__name__ == 'TwistedIOLoop':
- # TwistedIOLoop only supports the global reactor, so we can't have
- # separate IOLoops for client and server threads.
- raise unittest.SkipTest(
- 'Sync HTTPClient not compatible with TwistedIOLoop')
self.server_ioloop = IOLoop()
@gen.coroutine
HTTPInputError,
)
from tornado.escape import utf8, native_str
-from tornado.util import PY3
from tornado.log import gen_log
from tornado.testing import ExpectLog
from tornado.test.util import unittest
import logging
import pickle
import time
-
-if PY3:
- import urllib.parse as urllib_parse
-else:
- import urlparse as urllib_parse
+import urllib.parse
class TestUrlConcat(unittest.TestCase):
def test_parsing(self):
qsstring = "a=1&b=2&a=3"
- qs = urllib_parse.parse_qs(qsstring)
+ qs = urllib.parse.parse_qs(qsstring)
qsl = list(qs_to_qsl(qs))
self.assertIn(('a', '1'), qsl)
self.assertIn(('a', '3'), qsl)
# at startup, which in turn creates the default event loop and prevents forking.
# Explicitly disallow the default event loop so that an error will be raised
# if something tries to touch it.
-try:
- import asyncio
-except ImportError:
- pass
-else:
- asyncio.set_event_loop(None)
+import asyncio
+asyncio.set_event_loop(None)
import tornado.auth
import tornado.autoreload
from __future__ import absolute_import, division, print_function
from concurrent.futures import ThreadPoolExecutor
+from concurrent import futures
import contextlib
import datetime
import functools
import threading
import time
import types
-try:
- from unittest import mock # type: ignore
-except ImportError:
- try:
- import mock # type: ignore
- except ImportError:
- mock = None
+from unittest import mock
from tornado.escape import native_str
from tornado import gen
from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback
from tornado.log import app_log
from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test
-from tornado.test.util import (unittest, skipIfNonUnix, skipOnTravis,
- skipBefore35, exec_test)
-
-try:
- from concurrent import futures
-except ImportError:
- futures = None
-
-try:
- import asyncio
-except ImportError:
- asyncio = None
-
-try:
- import twisted
-except ImportError:
- twisted = None
+from tornado.test.util import unittest, skipIfNonUnix, skipOnTravis
class TestIOLoop(AsyncTestCase):
with ExpectLog(app_log, "Exception in callback"):
self.wait()
- @skipBefore35
def test_exception_logging_native_coro(self):
"""The IOLoop examines exceptions from awaitables and logs them."""
- namespace = exec_test(globals(), locals(), """
async def callback():
# Stop the IOLoop two iterations after raising an exception
# to give the exception time to be logged.
self.io_loop.add_callback(self.io_loop.add_callback, self.stop)
1 / 0
- """)
- self.io_loop.add_callback(namespace["callback"])
+ self.io_loop.add_callback(callback)
with ExpectLog(app_log, "Exception in callback"):
self.wait()
yield e.submit(IOLoop.clear_current)
-@unittest.skipIf(futures is None, "futures module not present")
class TestIOLoopFutures(AsyncTestCase):
def test_add_future_threads(self):
with futures.ThreadPoolExecutor(1) as pool:
self.assertEqual([event1, event2], res)
- @skipBefore35
@gen_test
def test_run_in_executor_native(self):
event1 = threading.Event()
# Go through an async wrapper to ensure that the result of
# run_in_executor works with await and not just gen.coroutine
# (simply passing the underlying concurrrent future would do that).
- namespace = exec_test(globals(), locals(), """
- async def async_wrapper(self_event, other_event):
- return await IOLoop.current().run_in_executor(
- None, sync_func, self_event, other_event)
- """)
+ async def async_wrapper(self_event, other_event):
+ return await IOLoop.current().run_in_executor(
+ None, sync_func, self_event, other_event)
res = yield [
- namespace["async_wrapper"](event1, event2),
- namespace["async_wrapper"](event2, event1)
+ async_wrapper(event1, event2),
+ async_wrapper(event2, event1)
]
self.assertEqual([event1, event2], res)
yield gen.sleep(1)
self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
- @skipBefore35
def test_native_coroutine(self):
@gen.coroutine
def f1():
yield gen.moment
- namespace = exec_test(globals(), locals(), """
async def f2():
await f1()
- """)
- self.io_loop.run_sync(namespace['f2'])
+ self.io_loop.run_sync(f2)
class TestPeriodicCallbackMath(unittest.TestCase):
self.assertEqual(self.simulate_calls(pc, [-100, 0, 0]),
[1010, 1020, 1030])
- @unittest.skipIf(mock is None, 'mock package not present')
def test_jitter(self):
random_times = [0.5, 1, 0, 0.75]
expected = [1010, 1022.5, 1030, 1041.25]
cls = self.run_python('print(classname(IOLoop()))')
self.assertEqual(cls, 'AsyncIOLoop')
- @unittest.skipIf(asyncio is not None,
- "IOLoop configuration not available")
- def test_explicit_select(self):
- # SelectIOLoop can always be configured explicitly.
- default_class = self.run_python(
- 'IOLoop.configure("tornado.platform.select.SelectIOLoop")',
- 'print(classname(IOLoop.current()))')
- self.assertEqual(default_class, 'SelectIOLoop')
-
- @unittest.skipIf(asyncio is None, "asyncio module not present")
def test_asyncio(self):
cls = self.run_python(
'IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")',
'print(classname(IOLoop.current()))')
self.assertEqual(cls, 'AsyncIOMainLoop')
- @unittest.skipIf(asyncio is None, "asyncio module not present")
def test_asyncio_main(self):
cls = self.run_python(
'from tornado.platform.asyncio import AsyncIOMainLoop',
'print(classname(IOLoop.current()))')
self.assertEqual(cls, 'AsyncIOMainLoop')
- @unittest.skipIf(twisted is None, "twisted module not present")
- @unittest.skipIf(asyncio is not None,
- "IOLoop configuration not available")
- def test_twisted(self):
- cls = self.run_python(
- 'from tornado.platform.twisted import TwistedIOLoop',
- 'TwistedIOLoop().install()',
- 'print(classname(IOLoop.current()))')
- self.assertEqual(cls, 'TwistedIOLoop')
-
if __name__ == "__main__":
unittest.main()
import socket
import ssl
import sys
-
-try:
- from unittest import mock # type: ignore
-except ImportError:
- try:
- import mock # type: ignore
- except ImportError:
- mock = None
+from unittest import mock
def _server_ssl_options():
# cygwin's errnos don't match those used on native windows python
self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
- @unittest.skipIf(mock is None, 'mock package not present')
@gen_test
def test_gaierror(self):
# Test that IOStream sets its exc_info on getaddrinfo error.
yield stream.connect(('localhost', 80))
self.assertTrue(isinstance(stream.error, socket.gaierror))
- @unittest.skipIf(mock is None, 'mock package not present')
@gen_test
def test_read_until_close_with_error(self):
server, client = yield self.make_iostream_pair()
import tornado.locale
from tornado.escape import utf8, to_unicode
-from tornado.test.util import unittest, skipOnAppEngine
+from tornado.test.util import unittest
from tornado.util import unicode_type
self.assertTrue(isinstance(locale, tornado.locale.CSVLocale))
self.assertEqual(locale.translate("school"), u"\u00e9cole")
- # tempfile.mkdtemp is not available on app engine.
- @skipOnAppEngine
def test_csv_bom(self):
with open(os.path.join(os.path.dirname(__file__), 'csv_translations',
'fr_FR.csv'), 'rb') as f:
from tornado import gen, locks
from tornado.gen import TimeoutError
from tornado.testing import gen_test, AsyncTestCase
-from tornado.test.util import unittest, skipBefore35, exec_test
+from tornado.test.util import unittest
class ConditionTest(AsyncTestCase):
# Semaphore was released and can be acquired again.
self.assertTrue(sem.acquire().done())
- @skipBefore35
@gen_test
def test_context_manager_async_await(self):
# Repeat the above test using 'async with'.
sem = locks.Semaphore()
- namespace = exec_test(globals(), locals(), """
async def f():
async with sem as yielded:
self.assertTrue(yielded is None)
- """)
- yield namespace['f']()
+ yield f()
# Semaphore was released and can be acquired again.
self.assertTrue(sem.acquire().done())
yield futures
self.assertEqual(list(range(N)), history)
- @skipBefore35
@gen_test
def test_acquire_fifo_async_with(self):
# Repeat the above test using `async with lock:`
N = 5
history = []
- namespace = exec_test(globals(), locals(), """
async def f(idx):
async with lock:
history.append(idx)
- """)
- futures = [namespace['f'](i) for i in range(N)]
+ futures = [f(i) for i in range(N)]
lock.release()
yield futures
self.assertEqual(list(range(N)), history)
from tornado.testing import AsyncTestCase, gen_test, bind_unused_port
from tornado.test.util import unittest, skipIfNoNetwork
-try:
- from concurrent import futures
-except ImportError:
- futures = None
-
try:
import pycares # type: ignore
except ImportError:
@skipIfNoNetwork
-@unittest.skipIf(futures is None, "futures module not present")
class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(ThreadedResolverTest, self).setUp()
@skipIfNoNetwork
-@unittest.skipIf(futures is None, "futures module not present")
@unittest.skipIf(sys.platform == 'win32', "preexec_fn not available on win32")
class ThreadedResolverImportTest(unittest.TestCase):
def test_import(self):
from __future__ import absolute_import, division, print_function
import datetime
+from io import StringIO
import os
import sys
+from unittest import mock
from tornado.options import OptionParser, Error
-from tornado.util import basestring_type, PY3
+from tornado.util import basestring_type
from tornado.test.util import unittest, subTest
-if PY3:
- from io import StringIO
-else:
- from cStringIO import StringIO
-
-try:
- # py33+
- from unittest import mock # type: ignore
-except ImportError:
- try:
- import mock # type: ignore
- except ImportError:
- mock = None
-
class Email(object):
def __init__(self, value):
self.assertEqual({}, options.group_dict('nonexistent'))
- @unittest.skipIf(mock is None, 'mock package not present')
def test_mock_patch(self):
# ensure that our setattr hooks don't interfere with mock.patch
options = OptionParser()
from __future__ import absolute_import, division, print_function
+import asyncio
import logging
import os
import signal
from tornado.test.util import unittest, skipIfNonUnix
from tornado.web import RequestHandler, Application
-try:
- import asyncio
-except ImportError:
- asyncio = None
-
-
-def skip_if_twisted():
- if IOLoop.configured_class().__name__.endswith('TwistedIOLoop'):
- raise unittest.SkipTest("Process tests not compatible with TwistedIOLoop")
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
-
-
@skipIfNonUnix
class ProcessTest(unittest.TestCase):
def get_app(self):
# reactor and don't restore it to a sane state after the fork
# (asyncio has the same issue, but we have a special case in
# place for it).
- skip_if_twisted()
with ExpectLog(gen_log, "(Starting .* processes|child .* exited|uncaught exception)"):
sock, port = bind_unused_port()
def test_stderr(self):
# This test is mysteriously flaky on twisted: it succeeds, but logs
# an error of EBADF on closing a file descriptor.
- skip_if_twisted()
subproc = Subprocess([sys.executable, '-u', '-c',
r"import sys; sys.stderr.write('hello\n')"],
stderr=Subprocess.STREAM)
subproc.stderr.close()
def test_sigchild(self):
- # Twisted's SIGCHLD handler and Subprocess's conflict with each other.
- skip_if_twisted()
Subprocess.initialize()
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c', 'pass'])
@gen_test
def test_sigchild_future(self):
- skip_if_twisted()
Subprocess.initialize()
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c', 'pass'])
self.assertEqual(subproc.returncode, ret)
def test_sigchild_signal(self):
- skip_if_twisted()
Subprocess.initialize()
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c',
@gen_test
def test_wait_for_exit_raise(self):
- skip_if_twisted()
Subprocess.initialize()
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c', 'import sys; sys.exit(1)'])
@gen_test
def test_wait_for_exit_raise_disabled(self):
- skip_if_twisted()
Subprocess.initialize()
self.addCleanup(Subprocess.uninitialize)
subproc = Subprocess([sys.executable, '-c', 'import sys; sys.exit(1)'])
from tornado import gen, queues
from tornado.gen import TimeoutError
from tornado.testing import gen_test, AsyncTestCase
-from tornado.test.util import unittest, skipBefore35, exec_test
+from tornado.test.util import unittest
class QueueBasicTest(AsyncTestCase):
for getter in getters:
self.assertRaises(TimeoutError, getter.result)
- @skipBefore35
@gen_test
def test_async_for(self):
q = queues.Queue()
for i in range(5):
q.put(i)
- namespace = exec_test(globals(), locals(), """
async def f():
results = []
async for i in q:
results.append(i)
if i == 4:
return results
- """)
- results = yield namespace['f']()
+ results = yield f()
self.assertEqual(results, list(range(5)))
from __future__ import absolute_import, division, print_function
+from functools import reduce
import gc
import io
import locale # system locale module, not tornado.locale
from tornado.httpclient import AsyncHTTPClient
from tornado.httpserver import HTTPServer
-from tornado.ioloop import IOLoop
from tornado.netutil import Resolver
-from tornado.options import define, options, add_parse_callback
+from tornado.options import define, add_parse_callback
from tornado.test.util import unittest
-try:
- reduce # py2
-except NameError:
- from functools import reduce # py3
TEST_MODULES = [
'tornado.httputil.doctests',
class LogCounter(logging.Filter):
"""Counts the number of WARNING or higher log records."""
def __init__(self, *args, **kwargs):
- # Can't use super() because logging.Filter is an old-style class in py26
- logging.Filter.__init__(self, *args, **kwargs)
+ super(LogCounter, self).__init__(*args, **kwargs)
self.info_count = self.warning_count = self.error_count = 0
def filter(self, record):
s, defaults=dict(allow_ipv6=False)))
define('httpserver', type=str, default=None,
callback=HTTPServer.configure)
- define('ioloop', type=str, default=None)
- define('ioloop_time_monotonic', default=False)
define('resolver', type=str, default=None,
callback=Resolver.configure)
define('debug_gc', type=str, multiple=True,
define('locale', type=str, default=None,
callback=lambda x: locale.setlocale(locale.LC_ALL, x))
- def configure_ioloop():
- kwargs = {}
- if options.ioloop_time_monotonic:
- from tornado.platform.auto import monotonic_time
- if monotonic_time is None:
- raise RuntimeError("monotonic clock not found")
- kwargs['time_func'] = monotonic_time
- if options.ioloop or kwargs:
- IOLoop.configure(options.ioloop, **kwargs)
- add_parse_callback(configure_ioloop)
-
log_counter = LogCounter()
add_parse_callback(
lambda: logging.getLogger().handlers[0].addFilter(log_counter))
import tornado.testing
kwargs = {}
- if sys.version_info >= (3, 2):
- # HACK: unittest.main will make its own changes to the warning
- # configuration, which may conflict with the settings above
- # or command-line flags like -bb. Passing warnings=False
- # suppresses this behavior, although this looks like an implementation
- # detail. http://bugs.python.org/issue15626
- kwargs['warnings'] = False
+
+ # HACK: unittest.main will make its own changes to the warning
+ # configuration, which may conflict with the settings above
+ # or command-line flags like -bb. Passing warnings=False
+ # suppresses this behavior, although this looks like an implementation
+ # detail. http://bugs.python.org/issue15626
+ kwargs['warnings'] = False
+
kwargs['testRunner'] = test_runner_factory(orig_stderr)
try:
tornado.testing.main(**kwargs)
from tornado.test import httpclient_test
from tornado.testing import (AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase,
ExpectLog, gen_test)
-from tornado.test.util import skipOnTravis, skipIfNoIPv6, refusing_port, skipBefore35, exec_test
+from tornado.test.util import skipOnTravis, skipIfNoIPv6, refusing_port
from tornado.web import RequestHandler, Application, url, stream_request_body
response.rethrow()
self.assertEqual(response.body, b"12345678")
- @skipBefore35
def test_native_body_producer_chunked(self):
- namespace = exec_test(globals(), locals(), """
async def body_producer(write):
await write(b'1234')
import asyncio
await asyncio.sleep(0)
await write(b'5678')
- """)
response = self.fetch("/echo_post", method="POST",
- body_producer=namespace["body_producer"])
+ body_producer=body_producer)
response.rethrow()
self.assertEqual(response.body, b"12345678")
- @skipBefore35
def test_native_body_producer_content_length(self):
- namespace = exec_test(globals(), locals(), """
async def body_producer(write):
await write(b'1234')
import asyncio
await asyncio.sleep(0)
await write(b'5678')
- """)
response = self.fetch("/echo_post", method="POST",
- body_producer=namespace["body_producer"],
+ body_producer=body_producer,
headers={'Content-Length': '8'})
response.rethrow()
self.assertEqual(response.body, b"12345678")
from tornado.iostream import IOStream
from tornado.log import app_log
from tornado.tcpserver import TCPServer
-from tornado.test.util import skipBefore35, skipIfNonUnix, exec_test, unittest
+from tornado.test.util import skipIfNonUnix, unittest
from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
if client is not None:
client.close()
- @skipBefore35
@gen_test
def test_handle_stream_native_coroutine(self):
# handle_stream may be a native coroutine.
- namespace = exec_test(globals(), locals(), """
class TestServer(TCPServer):
async def handle_stream(self, stream, address):
stream.write(b'data')
stream.close()
- """)
sock, port = bind_unused_port()
- server = namespace['TestServer']()
+ server = TestServer()
server.add_socket(sock)
client = IOStream(socket.socket())
yield client.connect(('localhost', port))
from tornado import gen, ioloop
from tornado.httpserver import HTTPServer
-from tornado.test.util import unittest, skipBefore35, exec_test
+from tornado.test.util import unittest
from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test
from tornado.web import Application
+import asyncio
import contextlib
import os
import platform
import traceback
import warnings
-try:
- import asyncio
-except ImportError:
- asyncio = None
-
@contextlib.contextmanager
def set_environ(name, value):
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
- @skipBefore35
@unittest.skipIf(platform.python_implementation() == 'PyPy',
'pypy destructor warnings cannot be silenced')
def test_undecorated_coroutine(self):
- namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
- """)
- test_class = namespace['Test']
- test = test_class('test_coro')
+ test = Test('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
test_with_kwargs(self, test='test')
self.finished = True
- @skipBefore35
def test_native_coroutine(self):
- namespace = exec_test(globals(), locals(), """
@gen_test
async def test(self):
self.finished = True
- """)
-
- namespace['test'](self)
+ test(self)
- @skipBefore35
def test_native_coroutine_timeout(self):
# Set a short timeout and exceed it.
- namespace = exec_test(globals(), locals(), """
@gen_test(timeout=0.1)
async def test(self):
await gen.sleep(1)
- """)
try:
- namespace['test'](self)
+ test(self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
self.finished = True
-@unittest.skipIf(asyncio is None, "asyncio module not present")
class GetNewIOLoopTest(AsyncTestCase):
def get_new_ioloop(self):
# Use the current loop instead of creating a new one here.
skipOnTravis = unittest.skipIf('TRAVIS' in os.environ,
'timing tests unreliable on travis')
-skipOnAppEngine = unittest.skipIf('APPENGINE_RUNTIME' in os.environ,
- 'not available on Google App Engine')
-
# Set the environment variable NO_NETWORK=1 to disable any tests that
# depend on an external network.
skipIfNoNetwork = unittest.skipIf('NO_NETWORK' in os.environ,
'network access disabled')
-skipBefore33 = unittest.skipIf(sys.version_info < (3, 3), 'PEP 380 (yield from) not available')
-skipBefore35 = unittest.skipIf(sys.version_info < (3, 5), 'PEP 492 (async/await) not available')
skipNotCPython = unittest.skipIf(platform.python_implementation() != 'CPython',
'Not CPython implementation')
# coding: utf-8
from __future__ import absolute_import, division, print_function
+from io import StringIO
import re
import sys
import datetime
from tornado.test.util import unittest
from tornado.util import (
raise_exc_info, Configurable, exec_in, ArgReplacer,
- timedelta_to_seconds, import_object, re_unescape, is_finalizing, PY3,
+ timedelta_to_seconds, import_object, re_unescape, is_finalizing
)
-if PY3:
- from io import StringIO
-else:
- from cStringIO import StringIO
-
class RaiseExcInfoTest(unittest.TestCase):
def test_two_arg_exception(self):
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.template import DictLoader
from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
-from tornado.test.util import unittest, skipBefore35, exec_test
-from tornado.util import ObjectDict, unicode_type, PY3
+from tornado.test.util import unittest
+from tornado.util import ObjectDict, unicode_type
from tornado.web import (
Application, RequestHandler, StaticFileHandler, RedirectHandler as WebRedirectHandler,
HTTPError, MissingArgumentError, ErrorHandler, authenticated, url,
import os
import re
import socket
-
-if PY3:
- import urllib.parse as urllib_parse # py3
-else:
- import urllib as urllib_parse # py2
-
-wsgi_safe_tests = []
+import urllib.parse
def relpath(*a):
return os.path.join(os.path.dirname(__file__), *a)
-def wsgi_safe(cls):
- wsgi_safe_tests.append(cls)
- return cls
-
-
class WebTestCase(AsyncHTTPTestCase):
"""Base class for web tests that also supports WSGI mode.
Override get_handlers and get_app_kwargs instead of get_app.
- Append to wsgi_safe to have it run in wsgi_test as well.
+ This class is deprecated since WSGI mode is no longer supported.
"""
def get_app(self):
self.app = Application(self.get_handlers(), **self.get_app_kwargs())
body=self.get_body_arguments("foo")))
-# This test is shared with wsgi_test.py
-@wsgi_safe
+# This test was shared with wsgi_test.py; now the name is meaningless.
class WSGISafeWebTest(WebTestCase):
COOKIE_SECRET = "WebTest.COOKIE_SECRET"
# Test merging of query and body arguments.
# In singular form, body arguments take precedence over query arguments.
- body = urllib_parse.urlencode(dict(foo="hello"))
+ body = urllib.parse.urlencode(dict(foo="hello"))
response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
self.assertEqual(response.body, b"hello")
# In plural methods they are merged.
def test_get_query_arguments(self):
# send as a post so we can ensure the separation between query
# string and body arguments.
- body = urllib_parse.urlencode(dict(foo="hello"))
+ body = urllib.parse.urlencode(dict(foo="hello"))
response = self.fetch("/get_argument?source=query&foo=bar",
method="POST", body=body)
self.assertEqual(response.body, b"bar")
self.assertEqual(response.body, b"default")
def test_get_body_arguments(self):
- body = urllib_parse.urlencode(dict(foo="bar"))
+ body = urllib.parse.urlencode(dict(foo="bar"))
response = self.fetch("/get_argument?source=body&foo=hello",
method="POST", body=body)
self.assertEqual(response.body, b"bar")
- body = urllib_parse.urlencode(dict(foo=""))
+ body = urllib.parse.urlencode(dict(foo=""))
response = self.fetch("/get_argument?source=body&foo=hello",
method="POST", body=body)
self.assertEqual(response.body, b"")
- body = urllib_parse.urlencode(dict())
+ body = urllib.parse.urlencode(dict())
response = self.fetch("/get_argument?source=body&foo=hello",
method="POST", body=body)
self.assertEqual(response.body, b"default")
self.assertEqual(response.body, b"ok")
-@wsgi_safe
class ErrorResponseTest(WebTestCase):
def get_handlers(self):
class DefaultHandler(RequestHandler):
self.assertEqual(b"", response.body)
-@wsgi_safe
class StaticFileTest(WebTestCase):
# The expected MD5 hash of robots.txt, used in tests that call
# StaticFileHandler.get_version
# that the stricter validation in 4.2.1 doesn't break them.
path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'static/robots.txt')
- response = self.get_and_head('/root_static' + urllib_parse.quote(path))
+ response = self.get_and_head('/root_static' + urllib.parse.quote(path))
self.assertEqual(response.code, 200)
-@wsgi_safe
class StaticDefaultFilenameTest(WebTestCase):
def get_app_kwargs(self):
return dict(static_path=relpath('static'),
self.assertTrue(response.headers['Location'].endswith('/static/dir/'))
-@wsgi_safe
class StaticFileWithPathTest(WebTestCase):
def get_app_kwargs(self):
return dict(static_path=relpath('static'),
self.assertEqual(response.body, b"H\xc3\xa9llo\n")
-@wsgi_safe
class CustomStaticFileTest(WebTestCase):
def get_handlers(self):
class MyStaticFileHandler(StaticFileHandler):
self.assertEqual(response.body, b"/static/foo.42.txt")
-@wsgi_safe
class HostMatchingTest(WebTestCase):
class Handler(RequestHandler):
def initialize(self, reply):
self.assertEqual(response.body, b"[3]")
-@wsgi_safe
class DefaultHostMatchingTest(WebTestCase):
def get_handlers(self):
return []
self.assertEqual(response.body, b"[2]")
-@wsgi_safe
class NamedURLSpecGroupsTest(WebTestCase):
def get_handlers(self):
class EchoHandler(RequestHandler):
self.assertEqual(response.body, b"bar")
-@wsgi_safe
class ClearHeaderTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertNotIn("Transfer-Encoding", response.headers)
-@wsgi_safe
class Header304Test(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertTrue("Transfer-Encoding" not in response2.headers)
-@wsgi_safe
class StatusReasonTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertEqual(response.reason, "Unknown")
-@wsgi_safe
class DateHeaderTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
datetime.timedelta(seconds=2))
-@wsgi_safe
class RaiseWithReasonTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown")
-@wsgi_safe
class ErrorHandlerXSRFTest(WebTestCase):
def get_handlers(self):
# note that if the handlers list is empty we get the default_host
self.assertEqual(response.code, 404)
-@wsgi_safe
class GzipTestCase(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
['Accept-Language', 'Cookie', 'Accept-Encoding'])
-@wsgi_safe
class PathArgsInPrepareTest(WebTestCase):
class Handler(RequestHandler):
def prepare(self):
self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
-@wsgi_safe
class ClearAllCookiesTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
pass
-@wsgi_safe
class ExceptionHandlerTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertEqual(response.code, 403)
-@wsgi_safe
class BuggyLoggingTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.fetch('/')
-@wsgi_safe
class UIMethodUIModuleTest(SimpleHandlerTestCase):
"""Test that UI methods and modules are created correctly and
associated with the handler.
b'In MyModule(123) with handler value asdf.')
-@wsgi_safe
class GetArgumentErrorTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
'log_message': 'Missing argument foo'})
-@wsgi_safe
class SetLazyPropertiesTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def prepare(self):
self.assertEqual(response.body, b'Hello Ben (en_US)')
-@wsgi_safe
class GetCurrentUserTest(WebTestCase):
def get_app_kwargs(self):
class WithoutUserModule(UIModule):
self.assertEqual(response.body, b'True')
-@wsgi_safe
class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
pass
class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
- # wsgiref.validate complains about unknown methods in a way that makes
- # this test not wsgi_safe.
class Handler(RequestHandler):
def other(self):
# Even though this method exists, it won't get called automatically
self.assertEqual(response.code, 405)
-@wsgi_safe
class AllHTTPMethodsTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def method(self):
self.assertEqual(response.body, b'other')
-@wsgi_safe
class FinishInPrepareTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def prepare(self):
self.assertEqual(response.body, b'done')
-@wsgi_safe
class Default404Test(WebTestCase):
def get_handlers(self):
# If there are no handlers at all a default redirect handler gets added.
b'<body>404: Not Found</body></html>')
-@wsgi_safe
class Custom404Test(WebTestCase):
def get_handlers(self):
return [('/foo', RequestHandler)]
self.assertEqual(response.body, b'custom 404 response')
-@wsgi_safe
class DefaultHandlerArgumentsTest(WebTestCase):
def get_handlers(self):
return [('/foo', RequestHandler)]
self.assertEqual(response.code, 403)
-@wsgi_safe
class HandlerByNameTest(WebTestCase):
def get_handlers(self):
# All three are equivalent.
return [('/', DecoratedFlowControlHandler, dict(test=self))]
-@skipBefore35
class NativeStreamingRequestFlowControlTest(
BaseStreamingRequestFlowControlTest,
WebTestCase):
def get_handlers(self):
class NativeFlowControlHandler(BaseFlowControlHandler):
- data_received = exec_test(globals(), locals(), """
async def data_received(self, data):
with self.in_method('data_received'):
import asyncio
await asyncio.sleep(0)
- """)["data_received"]
return [('/', NativeFlowControlHandler, dict(test=self))]
-@wsgi_safe
class IncorrectContentLengthTest(SimpleHandlerTestCase):
def get_handlers(self):
test = self
self.assertEqual(1, key_version)
-@wsgi_safe
class XSRFTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
+ body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)))
self.assertEqual(response.code, 403)
def test_xsrf_fail_argument_invalid_format(self):
response = self.fetch(
"/", method="POST",
headers=self.cookie_headers(),
- body=urllib_parse.urlencode(dict(_xsrf='3|')))
+ body=urllib.parse.urlencode(dict(_xsrf='3|')))
self.assertEqual(response.code, 403)
def test_xsrf_fail_cookie_invalid_format(self):
response = self.fetch(
"/", method="POST",
headers=self.cookie_headers(token='3|'),
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
+ body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)))
self.assertEqual(response.code, 403)
def test_xsrf_fail_cookie_no_body(self):
def test_xsrf_success_short_token(self):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf='deadbeef')),
+ body=urllib.parse.urlencode(dict(_xsrf='deadbeef')),
headers=self.cookie_headers(token='deadbeef'))
self.assertEqual(response.code, 200)
def test_xsrf_success_non_hex_token(self):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf='xoxo')),
+ body=urllib.parse.urlencode(dict(_xsrf='xoxo')),
headers=self.cookie_headers(token='xoxo'))
self.assertEqual(response.code, 200)
def test_xsrf_success_post_body(self):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
+ body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
headers=self.cookie_headers())
self.assertEqual(response.code, 200)
def test_xsrf_success_query_string(self):
response = self.fetch(
- "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
+ "/?" + urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
method="POST", body=b"",
headers=self.cookie_headers())
self.assertEqual(response.code, 200)
for token in (self.xsrf_token, token2):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=token)),
+ body=urllib.parse.urlencode(dict(_xsrf=token)),
headers=self.cookie_headers(token))
self.assertEqual(response.code, 200)
# Sending one in the cookie and the other in the body is not allowed.
with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=body_token)),
+ body=urllib.parse.urlencode(dict(_xsrf=body_token)),
headers=self.cookie_headers(cookie_token))
self.assertEqual(response.code, 403)
tokens_seen.add(token)
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
+ body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
headers=self.cookie_headers(token))
self.assertEqual(response.code, 200)
self.assertEqual(len(tokens_seen), 6)
(v2_token, v1_token)):
response = self.fetch(
"/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=body_token)),
+ body=urllib.parse.urlencode(dict(_xsrf=body_token)),
headers=self.cookie_headers(cookie_token))
self.assertEqual(response.code, 200)
-@wsgi_safe
class XSRFCookieKwargsTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertIn('httponly;', response.headers['Set-Cookie'].lower())
-@wsgi_safe
class FinishExceptionTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertEqual(b'authentication required', response.body)
-@wsgi_safe
class DecoratorTest(WebTestCase):
def get_handlers(self):
class RemoveSlashHandler(RequestHandler):
self.assertEqual(response.headers['Location'], "/addslash/?foo=bar")
-@wsgi_safe
class CacheTest(WebTestCase):
def get_handlers(self):
class EtagHandler(RequestHandler):
self.assertEqual(response.code, status_code)
-@wsgi_safe
class RequestSummaryTest(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
from __future__ import absolute_import, division, print_function
import functools
-import sys
import traceback
from tornado.concurrent import Future
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.template import DictLoader
from tornado.testing import AsyncHTTPTestCase, gen_test, bind_unused_port, ExpectLog
-from tornado.test.util import unittest, skipBefore35, exec_test
+from tornado.test.util import unittest
from tornado.web import Application, RequestHandler
try:
yield self.close(ws)
-if sys.version_info >= (3, 5):
- NativeCoroutineOnMessageHandler = exec_test(globals(), locals(), """
class NativeCoroutineOnMessageHandler(TestWebSocketHandler):
def initialize(self, close_future, compression_options=None):
super().initialize(close_future, compression_options)
self.sleeping += 1
await gen.sleep(0.01)
self.sleeping -= 1
- self.write_message(message)""")['NativeCoroutineOnMessageHandler']
+ self.write_message(message)
class WebSocketNativeCoroutineTest(WebSocketBaseTestCase):
('/native', NativeCoroutineOnMessageHandler,
dict(close_future=self.close_future))])
- @skipBefore35
@gen_test
def test_native_coroutine(self):
ws = yield self.ws_connect('/native')