From e4ead597956457aada766b09d624a3d9f7b888d9 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 4 Sep 2011 00:59:57 -0700 Subject: [PATCH] Add tornado.gen module for simpler generator-based async code. --- tornado/gen.py | 250 +++++++++++++++++++++++++++++++++++ tornado/test/gen_test.py | 158 ++++++++++++++++++++++ tornado/test/runtests.py | 1 + website/sphinx/gen.rst | 22 +++ website/sphinx/utilities.rst | 1 + 5 files changed, 432 insertions(+) create mode 100644 tornado/gen.py create mode 100644 tornado/test/gen_test.py create mode 100644 website/sphinx/gen.rst diff --git a/tornado/gen.py b/tornado/gen.py new file mode 100644 index 000000000..ca72fe22d --- /dev/null +++ b/tornado/gen.py @@ -0,0 +1,250 @@ +"""``tornado.gen`` is a generator-based interface to make it easier to +work in an asynchronous environment. Code using the ``gen`` module +is technically asynchronous, but it is written as a single generator +instead of a collection of separate functions. + +For example, the following asynchronous handler:: + + class AsyncHandler(RequestHandler): + @asynchronous + def get(self): + http_client = AsyncHTTPClient() + http_client.fetch("http://example.com", + callback=self.on_fetch) + + def on_fetch(self, response): + do_something_with_response(response) + self.render("template.html") + +could be written with ``gen`` as:: + + class GenAsyncHandler(RequestHandler): + @asynchronous + @gen.engine + def get(self): + http_client = AsyncHTTPClient() + response = yield gen.Task(http_client.fetch("http://example.com")) + do_something_with_response(response) + self.render("template.html") + +`Task` works with any function that takes a ``callback`` keyword argument +(and runs that callback with zero or one arguments). For more complicated +interfaces, `Task` can be split into two parts: `Callback` and `Wait`:: + + class GenAsyncHandler2(RequestHandler): + @asynchronous + @gen.engine + def get(self): + http_client = AsyncHTTPClient() + http_client.fetch("http://example.com", + callback=(yield gen.Callback("key")) + response = yield gen.Wait("key") + do_something_with_response(response) + self.render("template.html") + +The ``key`` argument to `Callback` and `Wait` allows for multiple +asynchronous operations to proceed in parallel: yield several +callbacks with different keys, then wait for them once all the async +operations have started. +""" + +import functools +import types + +class KeyReuseError(Exception): pass +class UnknownKeyError(Exception): pass +class LeakedCallbackError(Exception): pass +class BadYieldError(Exception): pass + +def engine(func): + """Decorator for asynchronous generators. + + Any generator that yields objects from this module must be wrapped + in this decorator. The decorator only works on functions that are + already asynchronous. For `~tornado.web.RequestHandler` + ``get``/``post``/etc methods, this means that both the `tornado.gen.engine` + and `tornado.web.asynchronous` decorators must be used (in either order). + In most other cases, it means that it doesn't make sense to use + ``gen.engine`` on functions that don't already take a callback argument. + """ + @functools.wraps(func) + def wrapper(*args, **kwargs): + gen = func(*args, **kwargs) + if isinstance(gen, types.GeneratorType): + Runner(gen).run() + return + assert gen is None, gen + # no yield, so we're done + return wrapper + +class YieldPoint(object): + """Base class for objects that may be yielded from the generator.""" + def start(self, runner): + """Called by the runner after the generator has yielded. + + No other methods will be called on this object before ``start``. + """ + raise NotImplementedError() + + def is_ready(self): + """Called by the runner to determine whether to resume the generator. + + May be called repeatedly until it returns True. + """ + raise NotImplementedError() + + def get_result(self): + """Returns the value to use as the result of the yield expression. + + This method will only be called once, and only after `is_ready` + has returned true. + """ + raise NotImplementedError() + +class Callback(YieldPoint): + """Returns a callable object that will allow a matching `Wait` to proceed. + + The key may be any value suitable for use as a dictionary key, and is + used to match ``Callbacks`` to their corresponding ``Waits``. The key + must be unique among outstanding callbacks within a single run of the + generator function, but may be reused across different runs of the same + function (so constants generally work fine). + + The callback may be called with zero or one arguments; if an argument + is given it will be returned by `Wait`. + """ + def __init__(self, key): + self.key = key + + def start(self, runner): + self.runner = runner + runner.register_callback(self.key) + + def is_ready(self): + return True + + def get_result(self): + return self.callback + + def callback(self, arg=None): + self.runner.set_result(self.key, arg) + +class Wait(YieldPoint): + """Returns the argument passed to the result of a previous `Callback`.""" + def __init__(self, key): + self.key = key + + def start(self, runner): + self.runner = runner + + def is_ready(self): + return self.runner.is_ready(self.key) + + def get_result(self): + return self.runner.pop_result(self.key) + +class Task(YieldPoint): + """Runs a single asynchronous operation. + + Takes a function (and optional additional arguments) and runs it with + those arguments plus a ``callback`` keyword argument. The argument passed + to the callback is returned as the result of the yield expression. + + A `Task` is equivalent to a `Callback`/`Wait` pair (with a unique + key generated automatically):: + + result = yield gen.Task(func, args) + + func(args, callback=(yield gen.Callback(key))) + result = yield gen.Wait(key) + """ + def __init__(self, func, *args, **kwargs): + assert "callback" not in kwargs + kwargs["callback"] = self.callback + self.func = functools.partial(func, *args, **kwargs) + + def start(self, runner): + self.runner = runner + self.key = object() + runner.register_callback(self.key) + self.func() + + def is_ready(self): + return self.runner.is_ready(self.key) + + def get_result(self): + return self.runner.pop_result(self.key) + + def callback(self, arg=None): + self.runner.set_result(self.key, arg) + +class _NullYieldPoint(YieldPoint): + def start(self, runner): + pass + def is_ready(self): + return True + def get_result(self): + return None + +class Runner(object): + """Internal implementation of `tornado.gen.engine`. + + Maintains information about pending callbacks and their results. + """ + def __init__(self, gen): + self.gen = gen + self.yield_point = _NullYieldPoint() + self.pending_callbacks = set() + self.results = {} + self.waiting = None + self.running = False + + def register_callback(self, key): + """Adds ``key`` to the list of callbacks.""" + if key in self.pending_callbacks: + raise KeyReuseError("key %r is already pending" % key) + self.pending_callbacks.add(key) + + def is_ready(self, key): + """Returns true if a result is available for ``key``.""" + if key not in self.pending_callbacks: + raise UnknownKeyError("key %r is not pending" % key) + return key in self.results + + def set_result(self, key, result): + """Sets the result for ``key`` and attempts to resume the generator.""" + self.results[key] = result + self.run() + + def pop_result(self, key): + """Returns the result for ``key`` and unregisters it.""" + self.pending_callbacks.remove(key) + return self.results.pop(key) + + def run(self): + """Starts or resumes the generator, running until it reaches a + yield point that is not ready. + """ + if self.running: + return + try: + self.running = True + while True: + if not self.yield_point.is_ready(): + return + next = self.yield_point.get_result() + try: + yielded = self.gen.send(next) + except StopIteration: + if self.pending_callbacks: + raise LeakedCallbackError( + "finished without waiting for callbacks %r" % + self.pending_callbacks) + return + if not isinstance(yielded, YieldPoint): + raise BadYieldError("yielded unknown object %r" % yielded) + self.yield_point = yielded + self.yield_point.start(self) + finally: + self.running = False + diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py new file mode 100644 index 000000000..1c1767a22 --- /dev/null +++ b/tornado/test/gen_test.py @@ -0,0 +1,158 @@ +from tornado.escape import url_escape +from tornado.httpclient import AsyncHTTPClient +from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, LogTrapTestCase +from tornado.util import b +from tornado.web import Application, RequestHandler, asynchronous + +from tornado import gen + +class GenTest(AsyncTestCase): + def run_gen(self, f): + f() + self.wait() + + def test_no_yield(self): + @gen.engine + def f(): + self.stop() + self.run_gen(f) + + def test_inline_cb(self): + @gen.engine + def f(): + (yield gen.Callback("k1"))() + res = yield gen.Wait("k1") + assert res is None + self.stop() + self.run_gen(f) + + def test_ioloop_cb(self): + @gen.engine + def f(): + self.io_loop.add_callback((yield gen.Callback("k1"))) + yield gen.Wait("k1") + self.stop() + self.run_gen(f) + + def test_exception_phase1(self): + @gen.engine + def f(): + 1/0 + self.assertRaises(ZeroDivisionError, self.run_gen, f) + + def test_exception_phase2(self): + @gen.engine + def f(): + self.io_loop.add_callback((yield gen.Callback("k1"))) + yield gen.Wait("k1") + 1/0 + self.assertRaises(ZeroDivisionError, self.run_gen, f) + + def test_with_arg(self): + @gen.engine + def f(): + (yield gen.Callback("k1"))(42) + res = yield gen.Wait("k1") + self.assertEqual(42, res) + self.stop() + self.run_gen(f) + + def test_key_reuse(self): + @gen.engine + def f(): + yield gen.Callback("k1") + yield gen.Callback("k1") + self.stop() + self.assertRaises(gen.KeyReuseError, self.run_gen, f) + + def test_key_mismatch(self): + @gen.engine + def f(): + yield gen.Callback("k1") + yield gen.Wait("k2") + self.stop() + self.assertRaises(gen.UnknownKeyError, self.run_gen, f) + + def test_leaked_callback(self): + @gen.engine + def f(): + yield gen.Callback("k1") + self.stop() + self.assertRaises(gen.LeakedCallbackError, self.run_gen, f) + + def test_parallel_callback(self): + @gen.engine + def f(): + for k in range(3): + self.io_loop.add_callback((yield gen.Callback(k))) + yield gen.Wait(1) + self.io_loop.add_callback((yield gen.Callback(3))) + yield gen.Wait(0) + yield gen.Wait(3) + yield gen.Wait(2) + self.stop() + self.run_gen(f) + + def test_bogus_yield(self): + @gen.engine + def f(): + yield 42 + self.assertRaises(gen.BadYieldError, self.run_gen, f) + + def test_reuse(self): + @gen.engine + def f(): + self.io_loop.add_callback((yield gen.Callback(0))) + yield gen.Wait(0) + self.stop() + self.run_gen(f) + self.run_gen(f) + + def test_task(self): + @gen.engine + def f(): + yield gen.Task(self.io_loop.add_callback) + self.stop() + self.run_gen(f) + + +class GenSequenceHandler(RequestHandler): + @asynchronous + @gen.engine + def get(self): + self.io_loop = self.request.connection.stream.io_loop + self.io_loop.add_callback((yield gen.Callback("k1"))) + yield gen.Wait("k1") + self.write("1") + self.io_loop.add_callback((yield gen.Callback("k2"))) + yield gen.Wait("k2") + self.write("2") + # reuse an old key + self.io_loop.add_callback((yield gen.Callback("k1"))) + yield gen.Wait("k1") + self.finish("3") + +class GenTaskHandler(RequestHandler): + @asynchronous + @gen.engine + def get(self): + io_loop = self.request.connection.stream.io_loop + client = AsyncHTTPClient(io_loop=io_loop) + response = yield gen.Task(client.fetch, self.get_argument('url')) + response.rethrow() + self.finish(b("got response: ") + response.body) + +class GenWebTest(AsyncHTTPTestCase, LogTrapTestCase): + def get_app(self): + return Application([ + ('/sequence', GenSequenceHandler), + ('/task', GenTaskHandler), + ]) + + def test_sequence_handler(self): + response = self.fetch('/sequence') + self.assertEqual(response.body, b("123")) + + def test_task_handler(self): + response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence'))) + self.assertEqual(response.body, b("got response: 123")) diff --git a/tornado/test/runtests.py b/tornado/test/runtests.py index efca8e6d3..14f161a2b 100755 --- a/tornado/test/runtests.py +++ b/tornado/test/runtests.py @@ -7,6 +7,7 @@ TEST_MODULES = [ 'tornado.util.doctests', 'tornado.test.curl_httpclient_test', 'tornado.test.escape_test', + 'tornado.test.gen_test', 'tornado.test.httpclient_test', 'tornado.test.httpserver_test', 'tornado.test.httputil_test', diff --git a/website/sphinx/gen.rst b/website/sphinx/gen.rst new file mode 100644 index 000000000..648cbb9f7 --- /dev/null +++ b/website/sphinx/gen.rst @@ -0,0 +1,22 @@ +``tornado.gen`` --- Simplify asynchronous code +============================================== + +.. automodule:: tornado.gen + + Decorator + --------- + + .. autofunction:: engine + + Yield points + ------------ + + Instances of the following classes may be used in yield expressions + in the generator. + + .. autoclass:: Task + + .. autoclass:: Callback + + .. autoclass:: Wait + diff --git a/website/sphinx/utilities.rst b/website/sphinx/utilities.rst index f007bc609..775e46c29 100644 --- a/website/sphinx/utilities.rst +++ b/website/sphinx/utilities.rst @@ -4,6 +4,7 @@ Utilities .. toctree:: autoreload + gen httputil options process -- 2.47.3