From: Ben Darnell Date: Sat, 5 Jul 2014 21:55:44 +0000 (-0400) Subject: Rewrite chatdemo to use a coroutine instead of callbacks. X-Git-Tag: v4.0.0b3~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2fdfcb1b192e29b555f1cd2b5a6a1d25b2b4dbcf;p=thirdparty%2Ftornado.git Rewrite chatdemo to use a coroutine instead of callbacks. --- diff --git a/demos/chat/chatdemo.py b/demos/chat/chatdemo.py index a8f5bb81f..7e3af039a 100755 --- a/demos/chat/chatdemo.py +++ b/demos/chat/chatdemo.py @@ -22,10 +22,12 @@ import tornado.web import os.path import uuid +from tornado.concurrent import Future from tornado import gen from tornado.options import define, options, parse_command_line define("port", default=8888, help="run on the given port", type=int) +define("debug", default=False, help="run in debug mode") class MessageBuffer(object): @@ -34,7 +36,12 @@ class MessageBuffer(object): self.cache = [] self.cache_size = 200 - def wait_for_messages(self, callback, cursor=None): + def wait_for_messages(self, cursor=None): + # Construct a Future to return to our caller. This allows + # wait_for_messages to be yielded from a coroutine even though + # it is not a coroutine itself. We will set the result of the + # Future when results are available. + result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): @@ -42,20 +49,20 @@ class MessageBuffer(object): break new_count += 1 if new_count: - callback(self.cache[-new_count:]) - return - self.waiters.add(callback) + result_future.set_result(callback(self.cache[-new_count:])) + return result_future + self.waiters.add(result_future) + return result_future - def cancel_wait(self, callback): - self.waiters.remove(callback) + def cancel_wait(self, future): + self.waiters.remove(future) + # Set an empty result to unblock any coroutines waiting. + future.set_result([]) def new_messages(self, messages): logging.info("Sending new message to %r listeners", len(self.waiters)) - for callback in self.waiters: - try: - callback(messages) - except: - logging.error("Error in waiter callback", exc_info=True) + for future in self.waiters: + future.set_result(messages) self.waiters = set() self.cache.extend(messages) if len(self.cache) > self.cache_size: @@ -100,20 +107,19 @@ class MessageNewHandler(BaseHandler): class MessageUpdatesHandler(BaseHandler): @tornado.web.authenticated - @tornado.web.asynchronous + @gen.coroutine def post(self): cursor = self.get_argument("cursor", None) - global_message_buffer.wait_for_messages(self.on_new_messages, - cursor=cursor) - - def on_new_messages(self, messages): - # Closed client connection + # Save the future returned by wait_for_messages so we can cancel + # it in wait_for_messages + self.future = global_message_buffer.wait_for_messages(cursor=cursor) + messages = yield self.future if self.request.connection.stream.closed(): return - self.finish(dict(messages=messages)) + self.write(dict(messages=messages)) def on_connection_close(self): - global_message_buffer.cancel_wait(self.on_new_messages) + global_message_buffer.cancel_wait(self.future) class AuthLoginHandler(BaseHandler, tornado.auth.GoogleMixin): @@ -149,6 +155,7 @@ def main(): template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, + debug=options.debug, ) app.listen(options.port) tornado.ioloop.IOLoop.instance().start()