def send_json(self, data: typing.Any, mode: str = "text") -> None:
assert mode in ["text", "binary"]
- text = json.dumps(data, separators=(",", ":"))
+ text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
if mode == "text":
self.send({"type": "websocket.receive", "text": text})
else:
async def send_json(self, data: typing.Any, mode: str = "text") -> None:
if mode not in {"text", "binary"}:
raise RuntimeError('The "mode" argument should be "text" or "binary".')
- text = json.dumps(data, separators=(",", ":"))
+ text = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
if mode == "text":
await self.send({"type": "websocket.send", "text": text})
else:
assert data == {"test": "data"}
+def test_websocket_ensure_unicode_on_send_json(
+ test_client_factory: Callable[..., TestClient]
+):
+ async def app(scope: Scope, receive: Receive, send: Send) -> None:
+ websocket = WebSocket(scope, receive=receive, send=send)
+
+ await websocket.accept()
+ message = await websocket.receive_json(mode="text")
+ await websocket.send_json(message, mode="text")
+ await websocket.close()
+
+ client = test_client_factory(app)
+ with client.websocket_connect("/123?a=abc") as websocket:
+ websocket.send_json({"test": "数据"}, mode="text")
+ data = websocket.receive_text()
+ assert data == '{"test":"数据"}'
+
+
def test_websocket_query_params(test_client_factory):
async def app(scope: Scope, receive: Receive, send: Send) -> None:
websocket = WebSocket(scope, receive=receive, send=send)