import socket
import time
import unittest
+from test import support
if not hasattr(select, "epoll"):
raise unittest.SkipTest("test works only on Linux 2.6")
client.sendall(b"Hello!")
server.sendall(b"world!!!")
- now = time.monotonic()
- events = ep.poll(1.0, 4)
- then = time.monotonic()
- self.assertFalse(then - now > 0.01)
+ # we might receive events one at a time, necessitating multiple calls to
+ # poll
+ events = []
+ for _ in support.busy_retry(support.SHORT_TIMEOUT):
+ now = time.monotonic()
+ events += ep.poll(1.0, 4)
+ then = time.monotonic()
+ self.assertFalse(then - now > 0.01)
+ if len(events) >= 2:
+ break
expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
(server.fileno(), select.EPOLLIN | select.EPOLLOUT)]