from tornado.iostream import StreamClosedError
from tornado.log import gen_log, app_log
from tornado import simple_httpclient
+from tornado.queues import Queue
from tornado.tcpclient import TCPClient
from tornado.util import _websocket_mask, PY3
self.compression_options = compression_options
self.connect_future = Future()
self.protocol = None
- self.read_future = None
- self.read_queue = collections.deque()
+ self.read_queue = Queue(1)
self.key = base64.b64encode(os.urandom(16))
self._on_message_callback = on_message_callback
self.close_code = self.close_reason = None
is given it will be called with the future when it is
ready.
"""
- assert self.read_future is None
- future = Future()
- if self.read_queue:
- future_set_result_unless_cancelled(future, self.read_queue.popleft())
- else:
- self.read_future = future
+
+ future = self.read_queue.get()
if callback is not None:
self.io_loop.add_future(future, callback)
return future
def on_message(self, message):
if self._on_message_callback:
self._on_message_callback(message)
- elif self.read_future is not None:
- future_set_result_unless_cancelled(self.read_future, message)
- self.read_future = None
else:
- self.read_queue.append(message)
+ return self.read_queue.put(message)
def ping(self, data=b''):
"""Send ping frame to the remote end.