From: Michael Tremer Date: Thu, 19 Sep 2024 06:39:56 +0000 (+0000) Subject: server: Experimental changes to make this multithreaded X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=00815c01cc64a6e309a3b1790d1e48509abcfc76;p=fireperf.git server: Experimental changes to make this multithreaded We seem to be hitting a bottleneck with the single-threaded approach on weaker processors which makes it a necessity to have more threads. This is a proof of concept. Signed-off-by: Michael Tremer --- diff --git a/src/main.c b/src/main.c index 1d996cd..fc351e8 100644 --- a/src/main.c +++ b/src/main.c @@ -338,7 +338,7 @@ int main(int argc, char* argv[]) { return fireperf_client(&conf, &stats, epollfd, timerfd); case FIREPERF_MODE_SERVER: - return fireperf_server(&conf, &stats, epollfd, timerfd); + return fireperf_server(&conf, epollfd, timerfd); case FIREPERF_MODE_NONE: fprintf(stderr, "No mode selected\n"); diff --git a/src/server.c b/src/server.c index 19cc345..85316dd 100644 --- a/src/server.c +++ b/src/server.c @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -32,8 +33,263 @@ #include "server.h" #include "util.h" +#define MAX_WORKERS 128 #define SOCKET_BACKLOG 1024 +struct fireperf_worker { + // Thread + pthread_t thread; + + // Configuration + struct fireperf_config* conf; + + // Collect stats + struct fireperf_stats stats; + + // Client Socket + int client; + + // Epoll + int epollfd; + + // Return code + int r; +}; + +struct fireperf_server { + // Configuration + struct fireperf_config* conf; + + // Collect stats + struct fireperf_stats stats; + + // Workers + struct fireperf_worker* workers[MAX_WORKERS]; + unsigned int num_workers; +}; + +static int is_worker_alive(struct fireperf_worker* worker) { + return pthread_tryjoin_np(worker->thread, NULL); +} + +static int join_worker(struct fireperf_worker* worker) { + int r; + + // Join the worker + r = pthread_join(worker->thread, NULL); + if (r) { + ERROR(worker->conf, "Could not join worker: %s\n", strerror(errno)); + return -errno; + } + + return 0; +} + +static void free_worker(struct fireperf_worker* worker) { + free(worker); +} + +static int free_terminated_workers(struct fireperf_server* server) { + int r; + + for (unsigned int i = 0; i < MAX_WORKERS; i++) { + struct fireperf_worker* worker = server->workers[i]; + + // Skip any empty slots + if (!worker) + continue; + + // Skip any workers that are still alive + if (is_worker_alive(worker)) + continue; + + // Free worker + free_worker(worker); + + // Clear the slot + server->workers[i] = NULL; + } +} + +static void* run_worker(void* w) { + struct fireperf_worker* worker = w; + int ready; + int fd = -1; + int r; + + struct epoll_event events[EPOLL_MAX_EVENTS]; + + DEBUG(worker->conf, "New worker launched as %lu...\n", pthread_self()); + + // We have exactly one open connection + worker->stats.open_connections = 1; + + worker->epollfd = -1; + + // Initialize epoll() + worker->epollfd = epoll_create1(0); + if (worker->epollfd < 0) { + ERROR(worker->conf, "Could not initialize epoll(): %s\n", strerror(errno)); + r = -errno; + goto ERROR; + } + + struct epoll_event ev = { + .events = EPOLLIN|EPOLLRDHUP, + .data.fd = worker->client, + }; + + if (!worker->conf->keepalive_only) + ev.events |= EPOLLOUT; + + // Add the socket to the loop + r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, worker->client, &ev); + if (r) { + ERROR(worker->conf, "Could not add the socket to epoll(): %s\n", strerror(errno)); + r = -errno; + goto ERROR; + } + + // Enter the main loop... + for (;;) { + ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1); + if (ready < 1) { + // We terminate gracefully when we receive a signal + if (errno == EINTR) + break; + + ERROR(worker->conf, "epoll_wait() failed: %s\n", strerror(errno)); + r = -errno; + goto ERROR; + } + + // Loop through all sockets that are ready... + for (int i = 0; i < ready; i++) { + fd = events[i].data.fd; + + // Handle the client socket + if (worker->client == fd) { + if (events[i].events & EPOLLRDHUP) { + DEBUG(worker->conf, "Connection %d has closed\n", fd); + + // There is no point in continuing any further + // after the client went away + goto ERROR; + } + + // Handle incoming data + if (events[i].events & EPOLLIN) { + r = handle_connection_recv(worker->conf, &worker->stats, fd); + if (r < 0) + goto ERROR; + } + + // Handle outgoing data + if (events[i].events & EPOLLOUT) { + r = handle_connection_send(worker->conf, &worker->stats, fd); + if (r < 0) + goto ERROR; + } + } + } + } + +ERROR: + if (worker->epollfd >= 0) + close(worker->epollfd); + if (worker->client >= 0) + close(worker->client); + + // Store the return code + worker->r = r; + + return &worker->r; +} + +static int create_worker(struct fireperf_server* server, const int fd) { + struct fireperf_worker* worker = NULL; + int r; + + DEBUG(server->conf, "Creating a new worker for %d\n", fd); + + // Check if we have space for a new worker + if (server->num_workers >= MAX_WORKERS) { + ERROR(server->conf, "Maximum number of workers reached. Closing connection.\n"); + return -ENOSPC; + } + + // Allocate a new worker + worker = calloc(1, sizeof(*worker)); + if (!worker) + return -errno; + + // Store a reference to the configuration + worker->conf = server->conf; + + // Store the client socket + worker->client = fd; + + // Store the worker + for (unsigned int i = 0; i < MAX_WORKERS; i++) { + if (server->workers[i]) + continue; + + // Store a reference in the first free slot + server->workers[i] = worker; + break; + } + + // We now have more workers + ++server->num_workers; + + // Run the worker + r = pthread_create(&worker->thread, NULL, run_worker, worker); + if (r) { + ERROR(server->conf, "Could not launch thread: %s\n", strerror(r)); + return -r; + } + + return 0; +} + +static int cumulate_stats(struct fireperf_stats* stats, struct fireperf_server* server) { + struct fireperf_worker* worker = NULL; + + // Reset open connections + stats->open_connections = 0; + + // Fetch the total number of connections + stats->connections = server->stats.connections; + + // Reset what has been sent in the last period + stats->bytes_received = 0; + stats->bytes_sent = 0; + + // Reset the stats to the server's stats + stats->total_bytes_received = server->stats.total_bytes_received; + stats->total_bytes_sent = server->stats.total_bytes_sent; + + // Add up all the worker's stats + for (unsigned int i = 0; i < MAX_WORKERS; i++) { + worker = server->workers[i]; + if (!worker) + continue; + + stats->open_connections += worker->stats.open_connections; + + stats->bytes_received += worker->stats.bytes_received; + stats->total_bytes_received += worker->stats.total_bytes_received; + stats->bytes_sent += worker->stats.bytes_sent; + stats->total_bytes_sent += worker->stats.total_bytes_sent; + + // Reset + worker->stats.bytes_received = 0; + worker->stats.bytes_sent = 0; + } + + return 0; +} + static int enable_keepalive(struct fireperf_config* conf, int fd) { // Enable keepalive int flags = 1; @@ -151,8 +407,10 @@ ERROR: return -1; } -static int accept_connection(struct fireperf_config* conf, int sockfd) { - struct sockaddr_in6 addr; +static int accept_connection(struct fireperf_server* server, int sockfd) { + struct sockaddr_in6 addr = {}; + int r; + socklen_t l = sizeof(addr); int fd = -1; @@ -163,13 +421,38 @@ static int accept_connection(struct fireperf_config* conf, int sockfd) { } while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); if (fd < 0) { - ERROR(conf, "Could not accept a new connection: %s\n", strerror(errno)); + ERROR(server->conf, "Could not accept a new connection: %s\n", strerror(errno)); return -1; } - DEBUG(conf, "New connection accepted on socket %d\n", fd); + DEBUG(server->conf, "New connection accepted on socket %d\n", fd); + + // A connection has been opened + server->stats.open_connections++; + server->stats.connections++; + + // Close connections immediately when -x is set + if (server->conf->close) { + DEBUG(server->conf, "Closing connection %d\n", fd); + r = 0; + + goto ERROR; + } + + // Create a new worker + r = create_worker(server, fd); + if (r < 0) { + ERROR(server->conf, "Could not create worker: %s\n", strerror(-r)); + goto ERROR; + } return fd; + +ERROR: + if (fd >= 0) + close(fd); + + return r; } static int is_listening_socket(struct fireperf_config* conf, int* sockets, int fd) { @@ -181,13 +464,18 @@ static int is_listening_socket(struct fireperf_config* conf, int* sockets, int f return 0; } -int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats, - int epollfd, int timerfd) { +int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd) { + struct fireperf_server server = { + .conf = conf, + }; + struct fireperf_stats stats = {}; + uint64_t expirations = 0; + int r; + DEBUG(conf, "Launching " PACKAGE_NAME " in server mode\n"); int listening_sockets[conf->listening_sockets]; - int r = 1; struct epoll_event ev; struct epoll_event events[EPOLL_MAX_EVENTS]; @@ -228,30 +516,12 @@ int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats, // The listening socket if (is_listening_socket(conf, listening_sockets, fd)) { - int connfd = accept_connection(conf, fd); - if (connfd < 0) + r = accept_connection(&server, fd); + if (r < 0) goto ERROR; - // Add the new socket to epoll() - ev.data.fd = connfd; - ev.events = EPOLLIN|EPOLLRDHUP; - if (!conf->keepalive_only) - ev.events |= EPOLLOUT; - - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev)) { - ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n", - strerror(errno)); - goto ERROR; - } - - // A connection has been opened - stats->open_connections++; - stats->connections++; - // Handle timer events } else if (fd == timerfd) { - uint64_t expirations; - // Read from the timer to disarm it ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations)); if (bytes_read <= 0) { @@ -259,53 +529,20 @@ int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats, goto ERROR; } - r = fireperf_dump_stats(conf, stats, FIREPERF_MODE_SERVER); + // Free any terminated workers + r = free_terminated_workers(&server); if (r) goto ERROR; - // Handle any connection events - } else { - if (events[i].events & EPOLLRDHUP) { - DEBUG(conf, "Connection %d has closed\n", fd); - - // Remove the file descriptor from epoll() - if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) { - ERROR(conf, "Could not remove socket file descriptfor from epoll(): %s\n", - strerror(errno)); - } - - // Free up any resources - close(fd); - - // This connection is now closed - stats->open_connections--; - - // Skip processing anything else, because it would be pointless - continue; - } - - // Close connections immediately when -x is set - if (conf->close) { - DEBUG(conf, "Closing connection %d\n", fd); - close(fd); - - stats->open_connections--; - continue; - } - - // Handle incoming data - if (events[i].events & EPOLLIN) { - r = handle_connection_recv(conf, stats, fd); - if (r < 0) - goto ERROR; - } + // Cumulate stats + r = cumulate_stats(&stats, &server); + if (r) + goto ERROR; - // Handle outgoing data - if (events[i].events & EPOLLOUT) { - r = handle_connection_send(conf, stats, fd); - if (r < 0) - goto ERROR; - } + // Print the stats + r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER); + if (r) + goto ERROR; } } } @@ -316,5 +553,18 @@ ERROR: close(listening_sockets[i]); } + // Wait for all workers to terminate + for (unsigned int i = 0; i < MAX_WORKERS; i++) { + if (server.workers[i]) { + // Wait for the worker to terminate + join_worker(server.workers[i]); + + // Free the worker + free_worker(server.workers[i]); + + server.workers[i] = NULL; + } + } + return r; } diff --git a/src/server.h b/src/server.h index cd40090..b6984c0 100644 --- a/src/server.h +++ b/src/server.h @@ -23,7 +23,6 @@ #include "main.h" -int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats, - int epollfd, int timerfd); +int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd); #endif /* FIREPERF_SERVER_H */