-#!/usr/bin/env python
+#!/usr/bin/env python3
#
# Copyright 2009 Facebook
#
# License for the specific language governing permissions and limitations
# under the License.
-import logging
+import asyncio
import tornado.escape
import tornado.ioloop
+import tornado.locks
import tornado.web
import os.path
import uuid
-from tornado.concurrent import Future
-from tornado import gen
from tornado.options import define, options, parse_command_line
define("port", default=8888, help="run on the given port", type=int)
-define("debug", default=False, help="run in debug mode")
+define("debug", default=True, help="run in debug mode")
class MessageBuffer(object):
def __init__(self):
- self.waiters = set()
+ # cond is notified whenever the message cache is updated
+ self.cond = tornado.locks.Condition()
self.cache = []
self.cache_size = 200
- def wait_for_messages(self, cursor=None):
- # Construct a Future to return to our caller. This allows
- # wait_for_messages to be yielded from a coroutine even though
- # it is not a coroutine itself. We will set the result of the
- # Future when results are available.
- result_future = Future()
- if cursor:
- new_count = 0
- for msg in reversed(self.cache):
- if msg["id"] == cursor:
- break
- new_count += 1
- if new_count:
- result_future.set_result(self.cache[-new_count:])
- return result_future
- self.waiters.add(result_future)
- return result_future
-
- def cancel_wait(self, future):
- self.waiters.remove(future)
- # Set an empty result to unblock any coroutines waiting.
- future.set_result([])
-
- def new_messages(self, messages):
- logging.info("Sending new message to %r listeners", len(self.waiters))
- for future in self.waiters:
- future.set_result(messages)
- self.waiters = set()
- self.cache.extend(messages)
+ def get_messages_since(self, cursor):
+ """Returns a list of messages newer than the given cursor.
+
+ ``cursor`` should be the ``id`` of the last message received.
+ """
+ results = []
+ for msg in reversed(self.cache):
+ if msg["id"] == cursor:
+ break
+ results.append(msg)
+ results.reverse()
+ return results
+
+ def add_message(self, message):
+ self.cache.append(message)
if len(self.cache) > self.cache_size:
self.cache = self.cache[-self.cache_size:]
+ self.cond.notify_all()
# Making this a non-singleton is left as an exercise for the reader.
class MessageNewHandler(tornado.web.RequestHandler):
+ """Post a new message to the chat room."""
def post(self):
message = {
"id": str(uuid.uuid4()),
"body": self.get_argument("body"),
}
- # to_basestring is necessary for Python 3's json encoder,
- # which doesn't accept byte strings.
- message["html"] = tornado.escape.to_basestring(
+ # render_string() returns a byte string, which is not supported
+ # in json, so we must convert it to a character string.
+ message["html"] = tornado.escape.to_unicode(
self.render_string("message.html", message=message))
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
self.write(message)
- global_message_buffer.new_messages([message])
+ global_message_buffer.add_message(message)
class MessageUpdatesHandler(tornado.web.RequestHandler):
- @gen.coroutine
- def post(self):
+ """Long-polling request for new messages.
+
+ Waits until new messages are available before returning anything.
+ """
+ async def post(self):
cursor = self.get_argument("cursor", None)
- # Save the future returned by wait_for_messages so we can cancel
- # it in wait_for_messages
- self.future = global_message_buffer.wait_for_messages(cursor=cursor)
- messages = yield self.future
+ messages = global_message_buffer.get_messages_since(cursor)
+ while not messages:
+ # Save the Future returned here so we can cancel it in
+ # on_connection_close.
+ self.wait_future = global_message_buffer.cond.wait()
+ try:
+ await self.wait_future
+ except asyncio.CancelledError:
+ return
+ messages = global_message_buffer.get_messages_since(cursor)
if self.request.connection.stream.closed():
return
self.write(dict(messages=messages))
def on_connection_close(self):
- global_message_buffer.cancel_wait(self.future)
+ self.wait_future.cancel()
def main():