from tornado.testing import AsyncHTTPTestCase, gen_test, bind_unused_port, ExpectLog
from tornado.test.util import unittest
from tornado.web import Application, RequestHandler
+from tornado.util import u
try:
import tornado.websocket
ws.close()
self.wait()
+ @gen_test
+ def test_binary_message(self):
+ ws = yield websocket_connect(
+ 'ws://localhost:%d/echo' % self.get_http_port())
+ ws.write_message(b'hello \xe9', binary=True)
+ response = yield ws.read_message()
+ self.assertEqual(response, b'hello \xe9')
+ ws.close()
+ yield self.close_future
+
+ @gen_test
+ def test_unicode_message(self):
+ ws = yield websocket_connect(
+ 'ws://localhost:%d/echo' % self.get_http_port())
+ ws.write_message(u('hello \u00e9'))
+ response = yield ws.read_message()
+ self.assertEqual(response, u('hello \u00e9'))
+ ws.close()
+ yield self.close_future
+
@gen_test
def test_websocket_http_fail(self):
with self.assertRaises(HTTPError) as cm: