import asyncio
import contextlib
+import datetime
import functools
import socket
import traceback
return app
@staticmethod
- def suppress_pong(ws):
- """Suppress the client's "pong" response."""
+ def install_hook(ws):
+ """Optionally suppress the client's "pong" response."""
+
+ ws.drop_pongs = False
+ ws.pongs_received = 0
def wrapper(fcn):
- def _inner(oppcode: int, data: bytes):
- if oppcode == 0xA: # NOTE: 0x9=ping, 0xA=pong
- # prevent pong responses
- return
+ def _inner(opcode: int, data: bytes):
+ if opcode == 0xA: # NOTE: 0x9=ping, 0xA=pong
+ ws.pongs_received += 1
+ if ws.drop_pongs:
+ # prevent pong responses
+ return
# leave all other responses unchanged
- return fcn(oppcode, data)
+ return fcn(opcode, data)
return _inner
ws = yield self.ws_connect(
"/", ping_interval=interval, ping_timeout=interval / 4
)
+ self.install_hook(ws)
# websocket handler (server side)
handler = self.handlers[0]
for _ in range(5):
# wait for the ping period
- yield gen.sleep(0.2)
+ yield gen.sleep(interval)
# connection should still be open from the server end
self.assertIsNone(handler.close_code)
# connection should still be open from the client end
assert ws.protocol.close_code is None
+ # Check that our hook is intercepting messages; allow for
+ # some variance in timing (due to e.g. cpu load)
+ self.assertGreaterEqual(ws.pongs_received, 4)
+
# suppress the pong response message
- self.suppress_pong(ws)
+ ws.drop_pongs = True
# give the server time to register this
yield gen.sleep(interval * 1.5)
self.assertEqual(ws.protocol.close_code, 1000)
+class PingCalculationTest(unittest.TestCase):
+ def test_ping_sleep_time(self):
+ from tornado.websocket import WebSocketProtocol13
+
+ now = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
+ interval = 10 # seconds
+ last_ping_time = datetime.datetime(
+ 2025, 1, 1, 11, 59, 54, tzinfo=datetime.timezone.utc
+ )
+ sleep_time = WebSocketProtocol13.ping_sleep_time(
+ last_ping_time=last_ping_time.timestamp(),
+ interval=interval,
+ now=now.timestamp(),
+ )
+ self.assertEqual(sleep_time, 4)
+
+
class ManualPingTest(WebSocketBaseTestCase):
def get_app(self):
class PingHandler(TestWebSocketHandler):
):
self._ping_coroutine = asyncio.create_task(self.periodic_ping())
+ @staticmethod
+ def ping_sleep_time(*, last_ping_time: float, interval: float, now: float) -> float:
+ """Calculate the sleep time until the next ping should be sent."""
+ return max(0, last_ping_time + interval - now)
+
async def periodic_ping(self) -> None:
"""Send a ping and wait for a pong if ping_timeout is configured.
return
# wait until the next scheduled ping
- await asyncio.sleep(ping_time + interval - IOLoop.current().time())
+ await asyncio.sleep(
+ self.ping_sleep_time(
+ last_ping_time=ping_time,
+ interval=interval,
+ now=IOLoop.current().time(),
+ )
+ )
class WebSocketClientConnection(simple_httpclient._HTTPConnection):