define('family', default='unspec',
help='Address family to query: unspec, inet, or inet6')
-@gen.engine
+@gen.coroutine
def main():
args = parse_command_line()
print('%s: %s' % (resolver.__class__.__name__,
pprint.pformat(addrinfo)))
print()
- IOLoop.instance().stop()
if __name__ == '__main__':
- IOLoop.instance().add_callback(main)
- IOLoop.instance().start()
+ IOLoop.instance().run_sync(main)
from __future__ import absolute_import, division, print_function, with_statement
+import functools
import time
import weakref
if async_client_class is None:
async_client_class = AsyncHTTPClient
self._async_client = async_client_class(self._io_loop, **kwargs)
- self._response = None
self._closed = False
def __del__(self):
If an error occurs during the fetch, we raise an `HTTPError`.
"""
- def callback(response):
- self._response = response
- self._io_loop.stop()
- self._io_loop.add_callback(self._async_client.fetch, request,
- callback, **kwargs)
- self._io_loop.start()
- response = self._response
- self._response = None
+ response = self._io_loop.run_sync(functools.partial(
+ self._async_client.fetch, request, **kwargs))
response.rethrow()
return response
from tornado.platform.auto import set_close_exec, Waker
+class TimeoutError(Exception):
+ pass
+
+
class IOLoop(Configurable):
"""A level-triggered I/O loop.
def make_current(self):
IOLoop._current.instance = self
- def clear_current(self):
- assert IOLoop._current.instance is self
+ @staticmethod
+ def clear_current():
IOLoop._current.instance = None
@classmethod
"""
raise NotImplementedError()
+ def run_sync(self, func, timeout=None):
+ """Starts the `IOLoop`, runs the given function, and stops the loop.
+
+ If the function returns a `Future`, the `IOLoop` will run until
+ the future is resolved. If it raises an exception, the `IOLoop`
+ will stop and the exception will be re-raised to the caller.
+
+ The keyword-only argument ``timeout`` may be used to set
+ a maximum duration for the function. If the timeout expires,
+ a `TimeoutError` is raised.
+
+ This method is useful in conjunction with `tornado.gen.coroutine`
+ to allow asynchronous calls in a `main()` function::
+
+ @gen.coroutine
+ def main():
+ # do stuff...
+
+ if __name__ == '__main__':
+ IOLoop.instance().run_sync(main)
+ """
+ future_cell = [None]
+ def run():
+ try:
+ result = func()
+ except Exception as e:
+ future_cell[0] = Future()
+ future_cell[0].set_exception(e)
+ else:
+ if isinstance(result, Future):
+ future_cell[0] = result
+ else:
+ future_cell[0] = Future()
+ future_cell[0].set_result(result)
+ self.add_future(future_cell[0], lambda future: self.stop())
+ self.add_callback(run)
+ if timeout is not None:
+ timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
+ self.start()
+ if timeout is not None:
+ self.remove_timeout(timeout_handle)
+ if not future_cell[0].done():
+ raise TimeoutError('Operation timed out after %s seconds' % timeout)
+ return future_cell[0].result()
+
+
def time(self):
"""Returns the current time according to the IOLoop's clock.
import threading
import time
-from tornado.ioloop import IOLoop
+from tornado import gen
+from tornado.ioloop import IOLoop, TimeoutError
from tornado.stack_context import ExceptionStackContext, StackContext, wrap, NullContext
from tornado.testing import AsyncTestCase, bind_unused_port
from tornado.test.util import unittest, skipIfNonUnix
self.assertEqual(self.future.exception().args[0], "worker")
+class TestIOLoopRunSync(unittest.TestCase):
+ def setUp(self):
+ self.io_loop = IOLoop()
+
+ def tearDown(self):
+ self.io_loop.close()
+
+ def test_sync_result(self):
+ self.assertEqual(self.io_loop.run_sync(lambda: 42), 42)
+
+ def test_sync_exception(self):
+ with self.assertRaises(ZeroDivisionError):
+ self.io_loop.run_sync(lambda: 1 / 0)
+
+ def test_async_result(self):
+ @gen.coroutine
+ def f():
+ yield gen.Task(self.io_loop.add_callback)
+ raise gen.Return(42)
+ self.assertEqual(self.io_loop.run_sync(f), 42)
+
+ def test_async_exception(self):
+ @gen.coroutine
+ def f():
+ yield gen.Task(self.io_loop.add_callback)
+ 1 / 0
+ with self.assertRaises(ZeroDivisionError):
+ self.io_loop.run_sync(f)
+
+ def test_current(self):
+ def f():
+ self.assertIs(IOLoop.current(), self.io_loop)
+ self.io_loop.run_sync(f)
+
+ def test_timeout(self):
+ @gen.coroutine
+ def f():
+ yield gen.Task(self.io_loop.add_timeout, self.io_loop.time() + 1)
+ self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
+
if __name__ == "__main__":
unittest.main()
def gen_test(f):
- """Testing equivalent of ``@gen.engine``, to be applied to test methods.
+ """Testing equivalent of ``@gen.coroutine``, to be applied to test methods.
- ``@gen.engine`` cannot be used on tests because the `IOLoop` is not
+ ``@gen.coroutine`` cannot be used on tests because the `IOLoop` is not
already running. ``@gen_test`` should be applied to test methods
on subclasses of `AsyncTestCase`.
- Note that unlike most uses of ``@gen.engine``, ``@gen_test`` can
- detect automatically when the function finishes cleanly so there
- is no need to run a callback to signal completion.
-
Example::
class MyTest(AsyncHTTPTestCase):
@gen_test
response = yield gen.Task(self.fetch('/'))
"""
+ f = gen.coroutine(f)
@functools.wraps(f)
- def wrapper(self, *args, **kwargs):
- result = f(self, *args, **kwargs)
- if result is None:
- return
- assert isinstance(result, types.GeneratorType)
- runner = gen.Runner(result, lambda value: self.stop())
- runner.run()
- self.wait()
+ def wrapper(self):
+ return self.io_loop.run_sync(functools.partial(f, self), timeout=5)
return wrapper