import utils
-# TODO: test often fails in CI, debug and maybe decrease MAX_SOCKETS?
MAX_SOCKETS = 15000 # upper bound of how many connections to open
MAX_ITERATIONS = 20 # number of iterations to run the test
])
def test_conn_flood(tmpdir, sock_func_name):
def create_sockets(make_sock, nsockets):
- buff, _ = utils.get_msgbuff()
sockets = []
-
next_ping = time.time() + 5 # less than tcp idle timeout
while True:
while time.time() < next_ping:
# large number of connections can take a lot of time to open
# send some valid data to avoid TCP idle timeout for already open sockets
- for s in sockets:
- s.sendall(buff)
next_ping = time.time() + 5
+ for s in sockets:
+ utils.ping_alive(s)
max_num_of_open_files = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - RESERVED_NOFILE
nsockets = min(max_num_of_open_files, MAX_SOCKETS)