]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Rewrite chatdemo to use a coroutine instead of callbacks.
authorBen Darnell <ben@bendarnell.com>
Sat, 5 Jul 2014 21:55:44 +0000 (17:55 -0400)
committerBen Darnell <ben@bendarnell.com>
Sat, 5 Jul 2014 21:55:44 +0000 (17:55 -0400)
demos/chat/chatdemo.py

index a8f5bb81f9c2d36eddefa9d6422bd824d1a1978a..7e3af039a6d04833c0dec0b4ce2ddcc379c9672f 100755 (executable)
@@ -22,10 +22,12 @@ import tornado.web
 import os.path
 import uuid
 
+from tornado.concurrent import Future
 from tornado import gen
 from tornado.options import define, options, parse_command_line
 
 define("port", default=8888, help="run on the given port", type=int)
+define("debug", default=False, help="run in debug mode")
 
 
 class MessageBuffer(object):
@@ -34,7 +36,12 @@ class MessageBuffer(object):
         self.cache = []
         self.cache_size = 200
 
-    def wait_for_messages(self, callback, cursor=None):
+    def wait_for_messages(self, cursor=None):
+        # Construct a Future to return to our caller.  This allows
+        # wait_for_messages to be yielded from a coroutine even though
+        # it is not a coroutine itself.  We will set the result of the
+        # Future when results are available.
+        result_future = Future()
         if cursor:
             new_count = 0
             for msg in reversed(self.cache):
@@ -42,20 +49,20 @@ class MessageBuffer(object):
                     break
                 new_count += 1
             if new_count:
-                callback(self.cache[-new_count:])
-                return
-        self.waiters.add(callback)
+                result_future.set_result(callback(self.cache[-new_count:]))
+                return result_future
+        self.waiters.add(result_future)
+        return result_future
 
-    def cancel_wait(self, callback):
-        self.waiters.remove(callback)
+    def cancel_wait(self, future):
+        self.waiters.remove(future)
+        # Set an empty result to unblock any coroutines waiting.
+        future.set_result([])
 
     def new_messages(self, messages):
         logging.info("Sending new message to %r listeners", len(self.waiters))
-        for callback in self.waiters:
-            try:
-                callback(messages)
-            except:
-                logging.error("Error in waiter callback", exc_info=True)
+        for future in self.waiters:
+            future.set_result(messages)
         self.waiters = set()
         self.cache.extend(messages)
         if len(self.cache) > self.cache_size:
@@ -100,20 +107,19 @@ class MessageNewHandler(BaseHandler):
 
 class MessageUpdatesHandler(BaseHandler):
     @tornado.web.authenticated
-    @tornado.web.asynchronous
+    @gen.coroutine
     def post(self):
         cursor = self.get_argument("cursor", None)
-        global_message_buffer.wait_for_messages(self.on_new_messages,
-                                                cursor=cursor)
-
-    def on_new_messages(self, messages):
-        # Closed client connection
+        # Save the future returned by wait_for_messages so we can cancel
+        # it in wait_for_messages
+        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
+        messages = yield self.future
         if self.request.connection.stream.closed():
             return
-        self.finish(dict(messages=messages))
+        self.write(dict(messages=messages))
 
     def on_connection_close(self):
-        global_message_buffer.cancel_wait(self.on_new_messages)
+        global_message_buffer.cancel_wait(self.future)
 
 
 class AuthLoginHandler(BaseHandler, tornado.auth.GoogleMixin):
@@ -149,6 +155,7 @@ def main():
         template_path=os.path.join(os.path.dirname(__file__), "templates"),
         static_path=os.path.join(os.path.dirname(__file__), "static"),
         xsrf_cookies=True,
+        debug=options.debug,
         )
     app.listen(options.port)
     tornado.ioloop.IOLoop.instance().start()