import os.path
import uuid
+from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line
define("port", default=8888, help="run on the given port", type=int)
+define("debug", default=False, help="run in debug mode")
class MessageBuffer(object):
self.cache = []
self.cache_size = 200
- def wait_for_messages(self, callback, cursor=None):
+ def wait_for_messages(self, cursor=None):
+ # Construct a Future to return to our caller. This allows
+ # wait_for_messages to be yielded from a coroutine even though
+ # it is not a coroutine itself. We will set the result of the
+ # Future when results are available.
+ result_future = Future()
if cursor:
new_count = 0
for msg in reversed(self.cache):
break
new_count += 1
if new_count:
- callback(self.cache[-new_count:])
- return
- self.waiters.add(callback)
+ result_future.set_result(callback(self.cache[-new_count:]))
+ return result_future
+ self.waiters.add(result_future)
+ return result_future
- def cancel_wait(self, callback):
- self.waiters.remove(callback)
+ def cancel_wait(self, future):
+ self.waiters.remove(future)
+ # Set an empty result to unblock any coroutines waiting.
+ future.set_result([])
def new_messages(self, messages):
logging.info("Sending new message to %r listeners", len(self.waiters))
- for callback in self.waiters:
- try:
- callback(messages)
- except:
- logging.error("Error in waiter callback", exc_info=True)
+ for future in self.waiters:
+ future.set_result(messages)
self.waiters = set()
self.cache.extend(messages)
if len(self.cache) > self.cache_size:
class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
- @tornado.web.asynchronous
+ @gen.coroutine
def post(self):
cursor = self.get_argument("cursor", None)
- global_message_buffer.wait_for_messages(self.on_new_messages,
- cursor=cursor)
-
- def on_new_messages(self, messages):
- # Closed client connection
+ # Save the future returned by wait_for_messages so we can cancel
+ # it in wait_for_messages
+ self.future = global_message_buffer.wait_for_messages(cursor=cursor)
+ messages = yield self.future
if self.request.connection.stream.closed():
return
- self.finish(dict(messages=messages))
+ self.write(dict(messages=messages))
def on_connection_close(self):
- global_message_buffer.cancel_wait(self.on_new_messages)
+ global_message_buffer.cancel_wait(self.future)
class AuthLoginHandler(BaseHandler, tornado.auth.GoogleMixin):
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
+ debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()