From 6a0fb025af0c345fd6b98d9cbb8c7b45ee87d920 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 13 May 2018 13:11:08 -0400 Subject: [PATCH] demos: Simplify chat demo using a Condition --- demos/chat/chatdemo.py | 89 ++++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 46 deletions(-) diff --git a/demos/chat/chatdemo.py b/demos/chat/chatdemo.py index 89149c420..05b9bd8be 100755 --- a/demos/chat/chatdemo.py +++ b/demos/chat/chatdemo.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Copyright 2009 Facebook # @@ -14,58 +14,45 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import asyncio import tornado.escape import tornado.ioloop +import tornado.locks import tornado.web import os.path import uuid -from tornado.concurrent import Future -from tornado import gen from tornado.options import define, options, parse_command_line define("port", default=8888, help="run on the given port", type=int) -define("debug", default=False, help="run in debug mode") +define("debug", default=True, help="run in debug mode") class MessageBuffer(object): def __init__(self): - self.waiters = set() + # cond is notified whenever the message cache is updated + self.cond = tornado.locks.Condition() self.cache = [] self.cache_size = 200 - def wait_for_messages(self, cursor=None): - # Construct a Future to return to our caller. This allows - # wait_for_messages to be yielded from a coroutine even though - # it is not a coroutine itself. We will set the result of the - # Future when results are available. - result_future = Future() - if cursor: - new_count = 0 - for msg in reversed(self.cache): - if msg["id"] == cursor: - break - new_count += 1 - if new_count: - result_future.set_result(self.cache[-new_count:]) - return result_future - self.waiters.add(result_future) - return result_future - - def cancel_wait(self, future): - self.waiters.remove(future) - # Set an empty result to unblock any coroutines waiting. - future.set_result([]) - - def new_messages(self, messages): - logging.info("Sending new message to %r listeners", len(self.waiters)) - for future in self.waiters: - future.set_result(messages) - self.waiters = set() - self.cache.extend(messages) + def get_messages_since(self, cursor): + """Returns a list of messages newer than the given cursor. + + ``cursor`` should be the ``id`` of the last message received. + """ + results = [] + for msg in reversed(self.cache): + if msg["id"] == cursor: + break + results.append(msg) + results.reverse() + return results + + def add_message(self, message): + self.cache.append(message) if len(self.cache) > self.cache_size: self.cache = self.cache[-self.cache_size:] + self.cond.notify_all() # Making this a non-singleton is left as an exercise for the reader. @@ -78,36 +65,46 @@ class MainHandler(tornado.web.RequestHandler): class MessageNewHandler(tornado.web.RequestHandler): + """Post a new message to the chat room.""" def post(self): message = { "id": str(uuid.uuid4()), "body": self.get_argument("body"), } - # to_basestring is necessary for Python 3's json encoder, - # which doesn't accept byte strings. - message["html"] = tornado.escape.to_basestring( + # render_string() returns a byte string, which is not supported + # in json, so we must convert it to a character string. + message["html"] = tornado.escape.to_unicode( self.render_string("message.html", message=message)) if self.get_argument("next", None): self.redirect(self.get_argument("next")) else: self.write(message) - global_message_buffer.new_messages([message]) + global_message_buffer.add_message(message) class MessageUpdatesHandler(tornado.web.RequestHandler): - @gen.coroutine - def post(self): + """Long-polling request for new messages. + + Waits until new messages are available before returning anything. + """ + async def post(self): cursor = self.get_argument("cursor", None) - # Save the future returned by wait_for_messages so we can cancel - # it in wait_for_messages - self.future = global_message_buffer.wait_for_messages(cursor=cursor) - messages = yield self.future + messages = global_message_buffer.get_messages_since(cursor) + while not messages: + # Save the Future returned here so we can cancel it in + # on_connection_close. + self.wait_future = global_message_buffer.cond.wait() + try: + await self.wait_future + except asyncio.CancelledError: + return + messages = global_message_buffer.get_messages_since(cursor) if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on_connection_close(self): - global_message_buffer.cancel_wait(self.future) + self.wait_future.cancel() def main(): -- 2.47.2