]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Reduce duplication in exec-based tests.
authorBen Darnell <ben@bendarnell.com>
Mon, 3 Aug 2015 01:21:19 +0000 (21:21 -0400)
committerBen Darnell <ben@bendarnell.com>
Mon, 3 Aug 2015 01:44:46 +0000 (21:44 -0400)
tornado/test/gen_test.py
tornado/test/locks_test.py
tornado/test/util.py

index 752cc8f9c00b1f0ceb7f687602de72ad6d62ba59..ecb972bec89199bd835dce109a76e8e51d10b2c0 100644 (file)
@@ -6,7 +6,6 @@ import functools
 import sys
 import textwrap
 import time
-import platform
 import weakref
 
 from tornado.concurrent import return_future, Future
@@ -16,7 +15,7 @@ from tornado.ioloop import IOLoop
 from tornado.log import app_log
 from tornado import stack_context
 from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
-from tornado.test.util import unittest, skipOnTravis
+from tornado.test.util import unittest, skipOnTravis, skipBefore33, skipBefore35, skipNotCPython, exec_test
 from tornado.web import Application, RequestHandler, asynchronous, HTTPError
 
 from tornado import gen
@@ -26,11 +25,6 @@ try:
 except ImportError:
     futures = None
 
-skipBefore33 = unittest.skipIf(sys.version_info < (3, 3), 'PEP 380 (yield from) not available')
-skipBefore35 = unittest.skipIf(sys.version_info < (3, 5), 'PEP 492 (async/await) not available')
-skipNotCPython = unittest.skipIf(platform.python_implementation() != 'CPython',
-                                 'Not CPython implementation')
-
 
 class GenEngineTest(AsyncTestCase):
     def setUp(self):
@@ -694,19 +688,13 @@ class GenCoroutineTest(AsyncTestCase):
     @skipBefore33
     @gen_test
     def test_async_return(self):
-        # It is a compile-time error to return a value in a generator
-        # before Python 3.3, so we must test this with exec.
-        # Flatten the real global and local namespace into our fake globals:
-        # it's all global from the perspective of f().
-        global_namespace = dict(globals(), **locals())
-        local_namespace = {}
-        exec(textwrap.dedent("""
+        namespace = exec_test(globals(), locals(), """
         @gen.coroutine
         def f():
             yield gen.Task(self.io_loop.add_callback)
             return 42
-        """), global_namespace, local_namespace)
-        result = yield local_namespace['f']()
+        """)
+        result = yield namespace['f']()
         self.assertEqual(result, 42)
         self.finished = True
 
@@ -716,16 +704,14 @@ class GenCoroutineTest(AsyncTestCase):
         # A yield statement exists but is not executed, which means
         # this function "returns" via an exception.  This exception
         # doesn't happen before the exception handling is set up.
-        global_namespace = dict(globals(), **locals())
-        local_namespace = {}
-        exec(textwrap.dedent("""
+        namespace = exec_test(globals(), locals(), """
         @gen.coroutine
         def f():
             if True:
                 return 42
             yield gen.Task(self.io_loop.add_callback)
-        """), global_namespace, local_namespace)
-        result = yield local_namespace['f']()
+        """)
+        result = yield namespace['f']()
         self.assertEqual(result, 42)
         self.finished = True
 
@@ -735,34 +721,30 @@ class GenCoroutineTest(AsyncTestCase):
         # This test verifies that an async function can await a
         # yield-based gen.coroutine, and that a gen.coroutine
         # (the test method itself) can yield an async function.
-        global_namespace = dict(globals(), **locals())
-        local_namespace = {}
-        exec(textwrap.dedent("""
+        namespace = exec_test(globals(), locals(), """
         async def f():
             await gen.Task(self.io_loop.add_callback)
             return 42
-        """), global_namespace, local_namespace)
-        result = yield local_namespace['f']()
+        """)
+        result = yield namespace['f']()
         self.assertEqual(result, 42)
         self.finished = True
 
     @skipBefore35
     @gen_test
     def test_async_await_mixed_multi(self):
-        global_namespace = dict(globals(), **locals())
-        local_namespace = {}
-        exec(textwrap.dedent("""
+        namespace = exec_test(globals(), locals(), """
         async def f1():
             await gen.Task(self.io_loop.add_callback)
             return 42
-        """), global_namespace, local_namespace)
+        """)
 
         @gen.coroutine
         def f2():
             yield gen.Task(self.io_loop.add_callback)
             raise gen.Return(43)
 
-        results = yield [local_namespace['f1'](), f2()]
+        results = yield [namespace['f1'](), f2()]
         self.assertEqual(results, [42, 43])
         self.finished = True
 
index b7b630970c91f6a8effbbc8bb197a6a5a21fc77f..43a6d5162cf20bb84cb5b7c096f5a041764f8133 100644 (file)
 # under the License.
 
 from datetime import timedelta
-import sys
-import textwrap
 
 from tornado import gen, locks
 from tornado.gen import TimeoutError
 from tornado.testing import gen_test, AsyncTestCase
-from tornado.test.util import unittest
-
-skipBefore35 = unittest.skipIf(sys.version_info < (3, 5), 'PEP 492 (async/await) not available')
+from tornado.test.util import unittest, skipBefore35, exec_test
 
 
 class ConditionTest(AsyncTestCase):
@@ -338,14 +334,12 @@ class SemaphoreContextManagerTest(AsyncTestCase):
         # Repeat the above test using 'async with'.
         sem = locks.Semaphore()
 
-        global_namespace = dict(globals(), **locals())
-        local_namespace = {}
-        exec(textwrap.dedent("""
+        namespace = exec_test(globals(), locals(), """
         async def f():
             async with sem as yielded:
                 self.assertTrue(yielded is None)
-        """), global_namespace, local_namespace)
-        yield local_namespace['f']()
+        """)
+        yield namespace['f']()
 
         # Semaphore was released and can be acquired again.
         self.assertTrue(sem.acquire().done())
@@ -475,14 +469,12 @@ class LockTests(AsyncTestCase):
         N = 5
         history = []
 
-        global_namespace = dict(globals(), **locals())
-        local_namespace = {}
-        exec(textwrap.dedent("""
+        namespace = exec_test(globals(), locals(), """
         async def f(idx):
             async with lock:
                 history.append(idx)
-        """), global_namespace, local_namespace)
-        futures = [local_namespace['f'](i) for i in range(N)]
+        """)
+        futures = [namespace['f'](i) for i in range(N)]
         lock.release()
         yield futures
         self.assertEqual(list(range(N)), history)
index 9dd9c0ce12d42b099b708fe1033c9edcfda70829..b8feff4fa91c115344e63ca723e84c084b25a896 100644 (file)
@@ -1,8 +1,10 @@
 from __future__ import absolute_import, division, print_function, with_statement
 
 import os
+import platform
 import socket
 import sys
+import textwrap
 
 from tornado.testing import bind_unused_port
 
@@ -32,6 +34,12 @@ skipIfNoNetwork = unittest.skipIf('NO_NETWORK' in os.environ,
 skipIfNoIPv6 = unittest.skipIf(not socket.has_ipv6, 'ipv6 support not present')
 
 
+skipBefore33 = unittest.skipIf(sys.version_info < (3, 3), 'PEP 380 (yield from) not available')
+skipBefore35 = unittest.skipIf(sys.version_info < (3, 5), 'PEP 492 (async/await) not available')
+skipNotCPython = unittest.skipIf(platform.python_implementation() != 'CPython',
+                                 'Not CPython implementation')
+
+
 def refusing_port():
     """Returns a local port number that will refuse all connections.
 
@@ -50,3 +58,18 @@ def refusing_port():
     conn.close()
     server_socket.close()
     return (client_socket.close, client_addr[1])
+
+
+def exec_test(caller_globals, caller_locals, s):
+    """Execute ``s`` in a given context and return the result namespace.
+
+    Used to define functions for tests in particular python
+    versions that would be syntax errors in older versions.
+    """
+    # Flatten the real global and local namespace into our fake
+    # globals: it's all global from the perspective of code defined
+    # in s.
+    global_namespace = dict(caller_globals, **caller_locals)
+    local_namespace = {}
+    exec(textwrap.dedent(s), global_namespace, local_namespace)
+    return local_namespace