From 5f2a4968ffd70004b01cdc8798c0fcc83b1edf96 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Mon, 1 Feb 2021 17:43:12 +0000 Subject: [PATCH] client: Try to keep "conf->parallel" connections open Signed-off-by: Michael Tremer --- src/client.c | 83 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/src/client.c b/src/client.c index 3de8a5d..82ae528 100644 --- a/src/client.c +++ b/src/client.c @@ -31,6 +31,12 @@ #include "logging.h" #include "main.h" +// Struct to collect client statistics +struct fireperf_client_stats { + // Total number of open connections + unsigned int connections; +}; + // Set to one when the timeout has expired static int timeout_expired = 0; @@ -95,8 +101,15 @@ static const char* fireperf_random_pool_get_slice(struct fireperf_random_pool* p return pool->data + offset; } -static int connect_socket(struct fireperf_config* conf, int fd) { - DEBUG(conf, "(Re-)connecting socket %d...\n", fd); +static int open_connection(struct fireperf_config* conf) { + // Open a new socket + int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); + if (fd < 0) { + ERROR(conf, "Could not open socket: %s\n", strerror(errno)); + goto ERROR; + } + + DEBUG(conf, "Opening socket %d...\n", fd); // Define the peer struct sockaddr_in6 peer = { @@ -111,7 +124,7 @@ static int connect_socket(struct fireperf_config* conf, int fd) { if (r) { ERROR(conf, "Could not set SO_KEEPALIVE on socket %d: %s\n", fd, strerror(errno)); - return 1; + goto ERROR; } // Set socket buffer sizes @@ -120,7 +133,7 @@ static int connect_socket(struct fireperf_config* conf, int fd) { if (r) { ERROR(conf, "Could not set send buffer size on socket %d: %s\n", fd, strerror(errno)); - return 1; + goto ERROR; } // Set keepalive interval @@ -132,7 +145,7 @@ static int connect_socket(struct fireperf_config* conf, int fd) { if (r) { ERROR(conf, "Could not set TCP_KEEPINTVL on socket %d: %s\n", fd, strerror(errno)); - return 1; + goto ERROR; } DEBUG(conf, "Setting keepalive idle interval to %d\n", conf->keepalive_interval); @@ -143,7 +156,7 @@ static int connect_socket(struct fireperf_config* conf, int fd) { if (r) { ERROR(conf, "Could not set TCP_KEEPIDLE on socket %d: %s\n", fd, strerror(errno)); - return 1; + goto ERROR; } } @@ -156,7 +169,7 @@ static int connect_socket(struct fireperf_config* conf, int fd) { if (r) { ERROR(conf, "Could not set TCP_KEEPCNT on socket %d: %s\n", fd, strerror(errno)); - return 1; + goto ERROR; } } @@ -164,10 +177,16 @@ static int connect_socket(struct fireperf_config* conf, int fd) { r = connect(fd, &peer, sizeof(peer)); if (r && (errno != EINPROGRESS)) { ERROR(conf, "Could not connect to server: %s\n", strerror(errno)); - return 1; + goto ERROR; } - return 0; + return fd; + +ERROR: + if (fd > 0) + close(fd); + + return -1; } static int send_data_to_server(struct fireperf_config* conf, int fd, @@ -189,6 +208,7 @@ static int send_data_to_server(struct fireperf_config* conf, int fd, } int fireperf_client(struct fireperf_config* conf) { + struct fireperf_client_stats stats = { 0 }; struct fireperf_random_pool* pool = NULL; DEBUG(conf, "Launching " PACKAGE_NAME " in client mode\n"); @@ -223,24 +243,6 @@ int fireperf_client(struct fireperf_config* conf) { DEBUG(conf, "Opening %lu connections...\n", conf->parallel); - // Open connections - for (unsigned int i = 0; i < conf->parallel; i++) { - // Open a new socket - int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); - if (fd < 0) { - ERROR(conf, "Could not open socket: %s\n", strerror(errno)); - goto ERROR; - } - - ev.data.fd = fd; - - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) { - ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n", - strerror(errno)); - goto ERROR; - } - } - // Configure timeout if set if (conf->timeout) { // Register signal handler @@ -252,6 +254,23 @@ int fireperf_client(struct fireperf_config* conf) { DEBUG(conf, "Entering main loop...\n"); while (!conf->terminated && !timeout_expired) { + // Open connections + while (stats.connections < conf->parallel) { + int fd = open_connection(conf); + if (fd < 0) + continue; + + ev.data.fd = fd; + + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) { + ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n", + strerror(errno)); + goto ERROR; + } + + stats.connections++; + } + int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1); if (fds < 1) { // We terminate gracefully when we receive a signal @@ -269,9 +288,15 @@ int fireperf_client(struct fireperf_config* conf) { // Has the socket been disconnected? if (events[i].events & EPOLLHUP) { - r = connect_socket(conf, fd); - if (r) + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) { + ERROR(conf, "Could not remove socket file descriptor from epoll(): %s\n", + strerror(errno)); goto ERROR; + } + + close(fd); + + stats.connections--; } else if (events[i].events & EPOLLOUT) { r = send_data_to_server(conf, fd, pool); -- 2.47.2