class TestWebSocketHandler(WebSocketHandler):
"""Base class for testing handlers that exposes the on_close event.
- This allows for deterministic cleanup of the associated socket.
+ This allows for tests to see the close code and reason on the
+ server side.
+
"""
- def initialize(self, close_future, compression_options=None):
+ def initialize(self, close_future=None, compression_options=None):
self.close_future = close_future
self.compression_options = compression_options
return self.compression_options
def on_close(self):
- self.close_future.set_result((self.close_code, self.close_reason))
+ if self.close_future is not None:
+ self.close_future.set_result((self.close_code, self.close_reason))
class EchoHandler(TestWebSocketHandler):
class CoroutineOnMessageHandler(TestWebSocketHandler):
- def initialize(self, close_future, compression_options=None):
- super(CoroutineOnMessageHandler, self).initialize(
- close_future, compression_options
- )
+ def initialize(self, **kwargs):
+ super(CoroutineOnMessageHandler, self).initialize(**kwargs)
self.sleeping = 0
@gen.coroutine
)
raise gen.Return(ws)
- @gen.coroutine
- def close(self, ws):
- """Close a websocket connection and wait for the server side.
-
- If we don't wait here, there are sometimes leak warnings in the
- tests.
- """
- ws.close()
- yield self.close_future
-
class WebSocketTest(WebSocketBaseTestCase):
def get_app(self):
yield ws.write_message("hello")
response = yield ws.read_message()
self.assertEqual(response, "hello")
- yield self.close(ws)
def test_websocket_callbacks(self):
websocket_connect(
ws.write_message(b"hello \xe9", binary=True)
response = yield ws.read_message()
self.assertEqual(response, b"hello \xe9")
- yield self.close(ws)
@gen_test
def test_unicode_message(self):
ws.write_message(u"hello \u00e9")
response = yield ws.read_message()
self.assertEqual(response, u"hello \u00e9")
- yield self.close(ws)
@gen_test
def test_render_message(self):
ws.write_message("hello")
response = yield ws.read_message()
self.assertEqual(response, "<b>hello</b>")
- yield self.close(ws)
@gen_test
def test_error_in_on_message(self):
with ExpectLog(app_log, "Uncaught exception"):
response = yield ws.read_message()
self.assertIs(response, None)
- yield self.close(ws)
@gen_test
def test_websocket_http_fail(self):
ws.write_message("world")
# Close the underlying stream.
ws.stream.close()
- yield self.close_future
@gen_test
def test_websocket_headers(self):
)
response = yield ws.read_message()
self.assertEqual(response, "hello")
- yield self.close(ws)
@gen_test
def test_websocket_header_echo(self):
self.assertEqual(
ws.headers.get("X-Extra-Response-Header"), "Extra-Response-Value"
)
- yield self.close(ws)
@gen_test
def test_server_close_reason(self):
ws.write_message("hello")
response = yield ws.read_message()
self.assertEqual(response, "hello")
- yield self.close(ws)
@gen_test
def test_check_origin_valid_with_path(self):
ws.write_message("hello")
response = yield ws.read_message()
self.assertEqual(response, "hello")
- yield self.close(ws)
@gen_test
def test_check_origin_invalid_partial_url(self):
self.assertEqual(ws.selected_subprotocol, "goodproto")
res = yield ws.read_message()
self.assertEqual(res, "subprotocol=goodproto")
- yield self.close(ws)
@gen_test
def test_subprotocols_not_offered(self):
self.assertIs(ws.selected_subprotocol, None)
res = yield ws.read_message()
self.assertEqual(res, "subprotocol=None")
- yield self.close(ws)
@gen_test
def test_open_coroutine(self):
self.message_sent.set()
res = yield ws.read_message()
self.assertEqual(res, "ok")
- yield self.close(ws)
class NativeCoroutineOnMessageHandler(TestWebSocketHandler):
- def initialize(self, close_future, compression_options=None):
- super().initialize(close_future, compression_options)
+ def initialize(self, **kwargs):
+ super().initialize(**kwargs)
self.sleeping = 0
async def on_message(self, message):
class WebSocketNativeCoroutineTest(WebSocketBaseTestCase):
def get_app(self):
- self.close_future = Future() # type: Future[None]
- return Application(
- [
- (
- "/native",
- NativeCoroutineOnMessageHandler,
- dict(close_future=self.close_future),
- )
- ]
- )
+ return Application([("/native", NativeCoroutineOnMessageHandler)])
@gen_test
def test_native_coroutine(self):
MESSAGE = "Hello world. Testing 123 123"
def get_app(self):
- self.close_future = Future() # type: Future[None]
-
class LimitedHandler(TestWebSocketHandler):
@property
def max_message_size(self):
(
"/echo",
EchoHandler,
- dict(
- close_future=self.close_future,
- compression_options=self.get_server_compression_options(),
- ),
+ dict(compression_options=self.get_server_compression_options()),
),
(
"/limited",
LimitedHandler,
- dict(
- close_future=self.close_future,
- compression_options=self.get_server_compression_options(),
- ),
+ dict(compression_options=self.get_server_compression_options()),
),
]
)
self.assertEqual(ws.protocol._message_bytes_out, len(self.MESSAGE) * 3)
self.assertEqual(ws.protocol._message_bytes_in, len(self.MESSAGE) * 3)
self.verify_wire_bytes(ws.protocol._wire_bytes_in, ws.protocol._wire_bytes_out)
- yield self.close(ws)
@gen_test
def test_size_limit(self):
ws.write_message("a" * 2048)
response = yield ws.read_message()
self.assertIsNone(response)
- yield self.close(ws)
class UncompressedTestMixin(CompressionTestMixin):
def on_pong(self, data):
self.write_message("got pong")
- self.close_future = Future() # type: Future[None]
- return Application(
- [("/", PingHandler, dict(close_future=self.close_future))],
- websocket_ping_interval=0.01,
- )
+ return Application([("/", PingHandler)], websocket_ping_interval=0.01)
@gen_test
def test_server_ping(self):
for i in range(3):
response = yield ws.read_message()
self.assertEqual(response, "got pong")
- yield self.close(ws)
# TODO: test that the connection gets closed if ping responses stop.
def on_ping(self, data):
self.write_message("got ping")
- self.close_future = Future() # type: Future[None]
- return Application([("/", PingHandler, dict(close_future=self.close_future))])
+ return Application([("/", PingHandler)])
@gen_test
def test_client_ping(self):
for i in range(3):
response = yield ws.read_message()
self.assertEqual(response, "got ping")
- yield self.close(ws)
# TODO: test that the connection gets closed if ping responses stop.
def on_ping(self, data):
self.write_message(data, binary=isinstance(data, bytes))
- self.close_future = Future() # type: Future[None]
- return Application([("/", PingHandler, dict(close_future=self.close_future))])
+ return Application([("/", PingHandler)])
@gen_test
def test_manual_ping(self):
ws.ping(b"binary hello")
resp = yield ws.read_message()
self.assertEqual(resp, b"binary hello")
- yield self.close(ws)
class MaxMessageSizeTest(WebSocketBaseTestCase):
def get_app(self):
- self.close_future = Future() # type: Future[None]
- return Application(
- [("/", EchoHandler, dict(close_future=self.close_future))],
- websocket_max_message_size=1024,
- )
+ return Application([("/", EchoHandler)], websocket_max_message_size=1024)
@gen_test
def test_large_message(self):