]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
demos: Simplify chat demo using a Condition
authorBen Darnell <ben@bendarnell.com>
Sun, 13 May 2018 17:11:08 +0000 (13:11 -0400)
committerBen Darnell <ben@bendarnell.com>
Sun, 13 May 2018 19:07:15 +0000 (15:07 -0400)
demos/chat/chatdemo.py

index 89149c4209a930f130751efdc1896c8e80273094..05b9bd8be073eb4860d6e1f751ea86cb79eca563 100755 (executable)
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 #
 # Copyright 2009 Facebook
 #
 # License for the specific language governing permissions and limitations
 # under the License.
 
-import logging
+import asyncio
 import tornado.escape
 import tornado.ioloop
+import tornado.locks
 import tornado.web
 import os.path
 import uuid
 
-from tornado.concurrent import Future
-from tornado import gen
 from tornado.options import define, options, parse_command_line
 
 define("port", default=8888, help="run on the given port", type=int)
-define("debug", default=False, help="run in debug mode")
+define("debug", default=True, help="run in debug mode")
 
 
 class MessageBuffer(object):
     def __init__(self):
-        self.waiters = set()
+        # cond is notified whenever the message cache is updated
+        self.cond = tornado.locks.Condition()
         self.cache = []
         self.cache_size = 200
 
-    def wait_for_messages(self, cursor=None):
-        # Construct a Future to return to our caller.  This allows
-        # wait_for_messages to be yielded from a coroutine even though
-        # it is not a coroutine itself.  We will set the result of the
-        # Future when results are available.
-        result_future = Future()
-        if cursor:
-            new_count = 0
-            for msg in reversed(self.cache):
-                if msg["id"] == cursor:
-                    break
-                new_count += 1
-            if new_count:
-                result_future.set_result(self.cache[-new_count:])
-                return result_future
-        self.waiters.add(result_future)
-        return result_future
-
-    def cancel_wait(self, future):
-        self.waiters.remove(future)
-        # Set an empty result to unblock any coroutines waiting.
-        future.set_result([])
-
-    def new_messages(self, messages):
-        logging.info("Sending new message to %r listeners", len(self.waiters))
-        for future in self.waiters:
-            future.set_result(messages)
-        self.waiters = set()
-        self.cache.extend(messages)
+    def get_messages_since(self, cursor):
+        """Returns a list of messages newer than the given cursor.
+
+        ``cursor`` should be the ``id`` of the last message received.
+        """
+        results = []
+        for msg in reversed(self.cache):
+            if msg["id"] == cursor:
+                break
+            results.append(msg)
+        results.reverse()
+        return results
+
+    def add_message(self, message):
+        self.cache.append(message)
         if len(self.cache) > self.cache_size:
             self.cache = self.cache[-self.cache_size:]
+        self.cond.notify_all()
 
 
 # Making this a non-singleton is left as an exercise for the reader.
@@ -78,36 +65,46 @@ class MainHandler(tornado.web.RequestHandler):
 
 
 class MessageNewHandler(tornado.web.RequestHandler):
+    """Post a new message to the chat room."""
     def post(self):
         message = {
             "id": str(uuid.uuid4()),
             "body": self.get_argument("body"),
         }
-        # to_basestring is necessary for Python 3's json encoder,
-        # which doesn't accept byte strings.
-        message["html"] = tornado.escape.to_basestring(
+        # render_string() returns a byte string, which is not supported
+        # in json, so we must convert it to a character string.
+        message["html"] = tornado.escape.to_unicode(
             self.render_string("message.html", message=message))
         if self.get_argument("next", None):
             self.redirect(self.get_argument("next"))
         else:
             self.write(message)
-        global_message_buffer.new_messages([message])
+        global_message_buffer.add_message(message)
 
 
 class MessageUpdatesHandler(tornado.web.RequestHandler):
-    @gen.coroutine
-    def post(self):
+    """Long-polling request for new messages.
+
+    Waits until new messages are available before returning anything.
+    """
+    async def post(self):
         cursor = self.get_argument("cursor", None)
-        # Save the future returned by wait_for_messages so we can cancel
-        # it in wait_for_messages
-        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
-        messages = yield self.future
+        messages = global_message_buffer.get_messages_since(cursor)
+        while not messages:
+            # Save the Future returned here so we can cancel it in
+            # on_connection_close.
+            self.wait_future = global_message_buffer.cond.wait()
+            try:
+                await self.wait_future
+            except asyncio.CancelledError:
+                return
+            messages = global_message_buffer.get_messages_since(cursor)
         if self.request.connection.stream.closed():
             return
         self.write(dict(messages=messages))
 
     def on_connection_close(self):
-        global_message_buffer.cancel_wait(self.future)
+        self.wait_future.cancel()
 
 
 def main():