]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add tornado.gen module for simpler generator-based async code.
authorBen Darnell <ben@bendarnell.com>
Sun, 4 Sep 2011 07:59:57 +0000 (00:59 -0700)
committerBen Darnell <ben@bendarnell.com>
Sun, 4 Sep 2011 07:59:57 +0000 (00:59 -0700)
tornado/gen.py [new file with mode: 0644]
tornado/test/gen_test.py [new file with mode: 0644]
tornado/test/runtests.py
website/sphinx/gen.rst [new file with mode: 0644]
website/sphinx/utilities.rst

diff --git a/tornado/gen.py b/tornado/gen.py
new file mode 100644 (file)
index 0000000..ca72fe2
--- /dev/null
@@ -0,0 +1,250 @@
+"""``tornado.gen`` is a generator-based interface to make it easier to
+work in an asynchronous environment.  Code using the ``gen`` module
+is technically asynchronous, but it is written as a single generator
+instead of a collection of separate functions.
+
+For example, the following asynchronous handler::
+
+    class AsyncHandler(RequestHandler):
+        @asynchronous
+        def get(self):
+            http_client = AsyncHTTPClient()
+            http_client.fetch("http://example.com",
+                              callback=self.on_fetch)
+
+        def on_fetch(self, response):
+            do_something_with_response(response)
+            self.render("template.html")
+
+could be written with ``gen`` as::
+
+    class GenAsyncHandler(RequestHandler):
+        @asynchronous
+        @gen.engine
+        def get(self):
+            http_client = AsyncHTTPClient()
+            response = yield gen.Task(http_client.fetch("http://example.com"))
+            do_something_with_response(response)
+            self.render("template.html")
+
+`Task` works with any function that takes a ``callback`` keyword argument
+(and runs that callback with zero or one arguments).  For more complicated
+interfaces, `Task` can be split into two parts: `Callback` and `Wait`::
+
+    class GenAsyncHandler2(RequestHandler):
+        @asynchronous
+        @gen.engine
+        def get(self):
+            http_client = AsyncHTTPClient()
+            http_client.fetch("http://example.com",
+                              callback=(yield gen.Callback("key"))
+            response = yield gen.Wait("key")
+            do_something_with_response(response)
+            self.render("template.html")
+
+The ``key`` argument to `Callback` and `Wait` allows for multiple
+asynchronous operations to proceed in parallel: yield several
+callbacks with different keys, then wait for them once all the async
+operations have started.
+"""
+
+import functools
+import types
+
+class KeyReuseError(Exception): pass
+class UnknownKeyError(Exception): pass
+class LeakedCallbackError(Exception): pass
+class BadYieldError(Exception): pass
+
+def engine(func):
+    """Decorator for asynchronous generators.
+
+    Any generator that yields objects from this module must be wrapped
+    in this decorator.  The decorator only works on functions that are
+    already asynchronous.  For `~tornado.web.RequestHandler`
+    ``get``/``post``/etc methods, this means that both the `tornado.gen.engine`
+    and `tornado.web.asynchronous` decorators must be used (in either order).
+    In most other cases, it means that it doesn't make sense to use
+    ``gen.engine`` on functions that don't already take a callback argument.
+    """
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        gen = func(*args, **kwargs)
+        if isinstance(gen, types.GeneratorType):
+            Runner(gen).run()
+            return
+        assert gen is None, gen
+        # no yield, so we're done
+    return wrapper
+
+class YieldPoint(object):
+    """Base class for objects that may be yielded from the generator."""
+    def start(self, runner):
+        """Called by the runner after the generator has yielded.
+        
+        No other methods will be called on this object before ``start``.
+        """
+        raise NotImplementedError()
+
+    def is_ready(self):
+        """Called by the runner to determine whether to resume the generator.
+
+        May be called repeatedly until it returns True.
+        """
+        raise NotImplementedError()
+
+    def get_result(self):
+        """Returns the value to use as the result of the yield expression.
+        
+        This method will only be called once, and only after `is_ready`
+        has returned true.
+        """
+        raise NotImplementedError()
+
+class Callback(YieldPoint):
+    """Returns a callable object that will allow a matching `Wait` to proceed.
+
+    The key may be any value suitable for use as a dictionary key, and is
+    used to match ``Callbacks`` to their corresponding ``Waits``.  The key
+    must be unique among outstanding callbacks within a single run of the
+    generator function, but may be reused across different runs of the same
+    function (so constants generally work fine).
+
+    The callback may be called with zero or one arguments; if an argument
+    is given it will be returned by `Wait`.
+    """
+    def __init__(self, key):
+        self.key = key
+
+    def start(self, runner):
+        self.runner = runner
+        runner.register_callback(self.key)
+
+    def is_ready(self):
+        return True
+
+    def get_result(self):
+        return self.callback
+
+    def callback(self, arg=None):
+        self.runner.set_result(self.key, arg)
+
+class Wait(YieldPoint):
+    """Returns the argument passed to the result of a previous `Callback`."""
+    def __init__(self, key):
+        self.key = key
+
+    def start(self, runner):
+        self.runner = runner
+
+    def is_ready(self):
+        return self.runner.is_ready(self.key)
+
+    def get_result(self):
+        return self.runner.pop_result(self.key)
+
+class Task(YieldPoint):
+    """Runs a single asynchronous operation.
+
+    Takes a function (and optional additional arguments) and runs it with
+    those arguments plus a ``callback`` keyword argument.  The argument passed
+    to the callback is returned as the result of the yield expression.
+
+    A `Task` is equivalent to a `Callback`/`Wait` pair (with a unique
+    key generated automatically)::
+    
+        result = yield gen.Task(func, args)
+        
+        func(args, callback=(yield gen.Callback(key)))
+        result = yield gen.Wait(key)
+    """
+    def __init__(self, func, *args, **kwargs):
+        assert "callback" not in kwargs
+        kwargs["callback"] = self.callback
+        self.func = functools.partial(func, *args, **kwargs)
+
+    def start(self, runner):
+        self.runner = runner
+        self.key = object()
+        runner.register_callback(self.key)
+        self.func()
+    
+    def is_ready(self):
+        return self.runner.is_ready(self.key)
+
+    def get_result(self):
+        return self.runner.pop_result(self.key)
+
+    def callback(self, arg=None):
+        self.runner.set_result(self.key, arg)
+
+class _NullYieldPoint(YieldPoint):
+    def start(self, runner):
+        pass
+    def is_ready(self):
+        return True
+    def get_result(self):
+        return None
+
+class Runner(object):
+    """Internal implementation of `tornado.gen.engine`.
+
+    Maintains information about pending callbacks and their results.
+    """
+    def __init__(self, gen):
+        self.gen = gen
+        self.yield_point = _NullYieldPoint()
+        self.pending_callbacks = set()
+        self.results = {}
+        self.waiting = None
+        self.running = False
+
+    def register_callback(self, key):
+        """Adds ``key`` to the list of callbacks."""
+        if key in self.pending_callbacks:
+            raise KeyReuseError("key %r is already pending" % key)
+        self.pending_callbacks.add(key)
+
+    def is_ready(self, key):
+        """Returns true if a result is available for ``key``."""
+        if key not in self.pending_callbacks:
+            raise UnknownKeyError("key %r is not pending" % key)
+        return key in self.results
+
+    def set_result(self, key, result):
+        """Sets the result for ``key`` and attempts to resume the generator."""
+        self.results[key] = result
+        self.run()
+
+    def pop_result(self, key):
+        """Returns the result for ``key`` and unregisters it."""
+        self.pending_callbacks.remove(key)
+        return self.results.pop(key)
+
+    def run(self):
+        """Starts or resumes the generator, running until it reaches a
+        yield point that is not ready.
+        """
+        if self.running:
+            return
+        try:
+            self.running = True
+            while True:
+                if not self.yield_point.is_ready():
+                    return
+                next = self.yield_point.get_result()
+                try:
+                    yielded = self.gen.send(next)
+                except StopIteration:
+                    if self.pending_callbacks:
+                        raise LeakedCallbackError(
+                            "finished without waiting for callbacks %r" %
+                            self.pending_callbacks)
+                    return
+                if not isinstance(yielded, YieldPoint):
+                    raise BadYieldError("yielded unknown object %r" % yielded)
+                self.yield_point = yielded
+                self.yield_point.start(self)
+        finally:
+            self.running = False
+
diff --git a/tornado/test/gen_test.py b/tornado/test/gen_test.py
new file mode 100644 (file)
index 0000000..1c1767a
--- /dev/null
@@ -0,0 +1,158 @@
+from tornado.escape import url_escape
+from tornado.httpclient import AsyncHTTPClient
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, LogTrapTestCase
+from tornado.util import b
+from tornado.web import Application, RequestHandler, asynchronous
+
+from tornado import gen
+
+class GenTest(AsyncTestCase):
+    def run_gen(self, f):
+        f()
+        self.wait()
+
+    def test_no_yield(self):
+        @gen.engine
+        def f():
+            self.stop()
+        self.run_gen(f)
+
+    def test_inline_cb(self):
+        @gen.engine
+        def f():
+            (yield gen.Callback("k1"))()
+            res = yield gen.Wait("k1")
+            assert res is None
+            self.stop()
+        self.run_gen(f)
+
+    def test_ioloop_cb(self):
+        @gen.engine
+        def f():
+            self.io_loop.add_callback((yield gen.Callback("k1")))
+            yield gen.Wait("k1")
+            self.stop()
+        self.run_gen(f)
+
+    def test_exception_phase1(self):
+        @gen.engine
+        def f():
+            1/0
+        self.assertRaises(ZeroDivisionError, self.run_gen, f)
+
+    def test_exception_phase2(self):
+        @gen.engine
+        def f():
+            self.io_loop.add_callback((yield gen.Callback("k1")))
+            yield gen.Wait("k1")
+            1/0
+        self.assertRaises(ZeroDivisionError, self.run_gen, f)
+
+    def test_with_arg(self):
+        @gen.engine
+        def f():
+            (yield gen.Callback("k1"))(42)
+            res = yield gen.Wait("k1")
+            self.assertEqual(42, res)
+            self.stop()
+        self.run_gen(f)
+
+    def test_key_reuse(self):
+        @gen.engine
+        def f():
+            yield gen.Callback("k1")
+            yield gen.Callback("k1")
+            self.stop()
+        self.assertRaises(gen.KeyReuseError, self.run_gen, f)
+
+    def test_key_mismatch(self):
+        @gen.engine
+        def f():
+            yield gen.Callback("k1")
+            yield gen.Wait("k2")
+            self.stop()
+        self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
+
+    def test_leaked_callback(self):
+        @gen.engine
+        def f():
+            yield gen.Callback("k1")
+            self.stop()
+        self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
+
+    def test_parallel_callback(self):
+        @gen.engine
+        def f():
+            for k in range(3):
+                self.io_loop.add_callback((yield gen.Callback(k)))
+            yield gen.Wait(1)
+            self.io_loop.add_callback((yield gen.Callback(3)))
+            yield gen.Wait(0)
+            yield gen.Wait(3)
+            yield gen.Wait(2)
+            self.stop()
+        self.run_gen(f)
+
+    def test_bogus_yield(self):
+        @gen.engine
+        def f():
+            yield 42
+        self.assertRaises(gen.BadYieldError, self.run_gen, f)
+
+    def test_reuse(self):
+        @gen.engine
+        def f():
+            self.io_loop.add_callback((yield gen.Callback(0)))
+            yield gen.Wait(0)
+            self.stop()
+        self.run_gen(f)
+        self.run_gen(f)
+
+    def test_task(self):
+        @gen.engine
+        def f():
+            yield gen.Task(self.io_loop.add_callback)
+            self.stop()
+        self.run_gen(f)
+
+
+class GenSequenceHandler(RequestHandler):
+    @asynchronous
+    @gen.engine
+    def get(self):
+        self.io_loop = self.request.connection.stream.io_loop
+        self.io_loop.add_callback((yield gen.Callback("k1")))
+        yield gen.Wait("k1")
+        self.write("1")
+        self.io_loop.add_callback((yield gen.Callback("k2")))
+        yield gen.Wait("k2")
+        self.write("2")
+        # reuse an old key
+        self.io_loop.add_callback((yield gen.Callback("k1")))
+        yield gen.Wait("k1")
+        self.finish("3")
+
+class GenTaskHandler(RequestHandler):
+    @asynchronous
+    @gen.engine
+    def get(self):
+        io_loop = self.request.connection.stream.io_loop
+        client = AsyncHTTPClient(io_loop=io_loop)
+        response = yield gen.Task(client.fetch, self.get_argument('url'))
+        response.rethrow()
+        self.finish(b("got response: ") + response.body)
+
+class GenWebTest(AsyncHTTPTestCase, LogTrapTestCase):
+    def get_app(self):
+        return Application([
+                ('/sequence', GenSequenceHandler),
+                ('/task', GenTaskHandler),
+                ])
+
+    def test_sequence_handler(self):
+        response = self.fetch('/sequence')
+        self.assertEqual(response.body, b("123"))
+
+    def test_task_handler(self):
+        response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence')))
+        self.assertEqual(response.body, b("got response: 123"))
index efca8e6d3010114292609c914c1083b410c32837..14f161a2b0a953166dbb0f85f64a2126362a33b5 100755 (executable)
@@ -7,6 +7,7 @@ TEST_MODULES = [
     'tornado.util.doctests',
     'tornado.test.curl_httpclient_test',
     'tornado.test.escape_test',
+    'tornado.test.gen_test',
     'tornado.test.httpclient_test',
     'tornado.test.httpserver_test',
     'tornado.test.httputil_test',
diff --git a/website/sphinx/gen.rst b/website/sphinx/gen.rst
new file mode 100644 (file)
index 0000000..648cbb9
--- /dev/null
@@ -0,0 +1,22 @@
+``tornado.gen`` --- Simplify asynchronous code
+==============================================
+
+.. automodule:: tornado.gen
+
+   Decorator
+   ---------
+
+   .. autofunction:: engine
+
+   Yield points
+   ------------
+
+   Instances of the following classes may be used in yield expressions
+   in the generator.  
+
+   .. autoclass:: Task
+
+   .. autoclass:: Callback
+
+   .. autoclass:: Wait
+
index f007bc609522c2ef0ef50a0d01fcfd031e6740c3..775e46c2937c9d7b8d6a364eaa596a31f83ccb32 100644 (file)
@@ -4,6 +4,7 @@ Utilities
 .. toctree::
 
    autoreload
+   gen
    httputil
    options
    process