]> git.ipfire.org Git - fireperf.git/commitdiff
server: Experimental changes to make this multithreaded
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 06:39:56 +0000 (06:39 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 06:39:56 +0000 (06:39 +0000)
We seem to be hitting a bottleneck with the single-threaded approach on
weaker processors which makes it a necessity to have more threads.

This is a proof of concept.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/main.c
src/server.c
src/server.h

index 1d996cd01b0820bbff63286b1ddbd6970f4b9cec..fc351e8adb7956cd32cf863d7e1f97564719c7de 100644 (file)
@@ -338,7 +338,7 @@ int main(int argc, char* argv[]) {
                        return fireperf_client(&conf, &stats, epollfd, timerfd);
 
                case FIREPERF_MODE_SERVER:
-                       return fireperf_server(&conf, &stats, epollfd, timerfd);
+                       return fireperf_server(&conf, epollfd, timerfd);
 
                case FIREPERF_MODE_NONE:
                        fprintf(stderr, "No mode selected\n");
index 19cc345d5debd5aa2146eb0e74749747b8e99534..85316dd71c8e8a77f3e95ccc036c36ecb1b6a27b 100644 (file)
@@ -20,6 +20,7 @@
 
 #include <errno.h>
 #include <netinet/tcp.h>
+#include <pthread.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
 #include "server.h"
 #include "util.h"
 
+#define MAX_WORKERS       128
 #define SOCKET_BACKLOG   1024
 
+struct fireperf_worker {
+       // Thread
+       pthread_t thread;
+
+       // Configuration
+       struct fireperf_config* conf;
+
+       // Collect stats
+       struct fireperf_stats stats;
+
+       // Client Socket
+       int client;
+
+       // Epoll
+       int epollfd;
+
+       // Return code
+       int r;
+};
+
+struct fireperf_server {
+       // Configuration
+       struct fireperf_config* conf;
+
+       // Collect stats
+       struct fireperf_stats stats;
+
+       // Workers
+       struct fireperf_worker* workers[MAX_WORKERS];
+       unsigned int num_workers;
+};
+
+static int is_worker_alive(struct fireperf_worker* worker) {
+       return pthread_tryjoin_np(worker->thread, NULL);
+}
+
+static int join_worker(struct fireperf_worker* worker) {
+       int r;
+
+       // Join the worker
+       r = pthread_join(worker->thread, NULL);
+       if (r) {
+               ERROR(worker->conf, "Could not join worker: %s\n", strerror(errno));
+               return -errno;
+       }
+
+       return 0;
+}
+
+static void free_worker(struct fireperf_worker* worker) {
+       free(worker);
+}
+
+static int free_terminated_workers(struct fireperf_server* server) {
+       int r;
+
+       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+               struct fireperf_worker* worker = server->workers[i];
+
+               // Skip any empty slots
+               if (!worker)
+                       continue;
+
+               // Skip any workers that are still alive
+               if (is_worker_alive(worker))
+                       continue;
+
+               // Free worker
+               free_worker(worker);
+
+               // Clear the slot
+               server->workers[i] = NULL;
+       }
+}
+
+static void* run_worker(void* w) {
+       struct fireperf_worker* worker = w;
+       int ready;
+       int fd = -1;
+       int r;
+
+       struct epoll_event events[EPOLL_MAX_EVENTS];
+
+       DEBUG(worker->conf, "New worker launched as %lu...\n", pthread_self());
+
+       // We have exactly one open connection
+       worker->stats.open_connections = 1;
+
+       worker->epollfd = -1;
+
+       // Initialize epoll()
+       worker->epollfd = epoll_create1(0);
+       if (worker->epollfd < 0) {
+               ERROR(worker->conf, "Could not initialize epoll(): %s\n", strerror(errno));
+               r = -errno;
+               goto ERROR;
+       }
+
+       struct epoll_event ev = {
+               .events  = EPOLLIN|EPOLLRDHUP,
+               .data.fd = worker->client,
+       };
+
+       if (!worker->conf->keepalive_only)
+               ev.events |= EPOLLOUT;
+
+       // Add the socket to the loop
+       r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, worker->client, &ev);
+       if (r) {
+               ERROR(worker->conf, "Could not add the socket to epoll(): %s\n", strerror(errno));
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Enter the main loop...
+       for (;;) {
+               ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
+               if (ready < 1) {
+                       // We terminate gracefully when we receive a signal
+                       if (errno == EINTR)
+                               break;
+
+                       ERROR(worker->conf, "epoll_wait() failed: %s\n", strerror(errno));
+                       r = -errno;
+                       goto ERROR;
+               }
+
+               // Loop through all sockets that are ready...
+               for (int i = 0; i < ready; i++) {
+                       fd = events[i].data.fd;
+
+                       // Handle the client socket
+                       if (worker->client == fd) {
+                               if (events[i].events & EPOLLRDHUP) {
+                                       DEBUG(worker->conf, "Connection %d has closed\n", fd);
+
+                                       // There is no point in continuing any further
+                                       // after the client went away
+                                       goto ERROR;
+                               }
+
+                               // Handle incoming data
+                               if (events[i].events & EPOLLIN) {
+                                       r = handle_connection_recv(worker->conf, &worker->stats, fd);
+                                       if (r < 0)
+                                               goto ERROR;
+                               }
+
+                               // Handle outgoing data
+                               if (events[i].events & EPOLLOUT) {
+                                       r = handle_connection_send(worker->conf, &worker->stats, fd);
+                                       if (r < 0)
+                                               goto ERROR;
+                               }
+                       }
+               }
+       }
+
+ERROR:
+       if (worker->epollfd >= 0)
+               close(worker->epollfd);
+       if (worker->client >= 0)
+               close(worker->client);
+
+       // Store the return code
+       worker->r = r;
+
+       return &worker->r;
+}
+
+static int create_worker(struct fireperf_server* server, const int fd) {
+       struct fireperf_worker* worker = NULL;
+       int r;
+
+       DEBUG(server->conf, "Creating a new worker for %d\n", fd);
+
+       // Check if we have space for a new worker
+       if (server->num_workers >= MAX_WORKERS) {
+               ERROR(server->conf, "Maximum number of workers reached. Closing connection.\n");
+               return -ENOSPC;
+       }
+
+       // Allocate a new worker
+       worker = calloc(1, sizeof(*worker));
+       if (!worker)
+               return -errno;
+
+       // Store a reference to the configuration
+       worker->conf = server->conf;
+
+       // Store the client socket
+       worker->client = fd;
+
+       // Store the worker
+       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+               if (server->workers[i])
+                       continue;
+
+               // Store a reference in the first free slot
+               server->workers[i] = worker;
+               break;
+       }
+
+       // We now have more workers
+       ++server->num_workers;
+
+       // Run the worker
+       r = pthread_create(&worker->thread, NULL, run_worker, worker);
+       if (r) {
+               ERROR(server->conf, "Could not launch thread: %s\n", strerror(r));
+               return -r;
+       }
+
+       return 0;
+}
+
+static int cumulate_stats(struct fireperf_stats* stats, struct fireperf_server* server) {
+       struct fireperf_worker* worker = NULL;
+
+       // Reset open connections
+       stats->open_connections = 0;
+
+       // Fetch the total number of connections
+       stats->connections = server->stats.connections;
+
+       // Reset what has been sent in the last period
+       stats->bytes_received       = 0;
+       stats->bytes_sent           = 0;
+
+       // Reset the stats to the server's stats
+       stats->total_bytes_received = server->stats.total_bytes_received;
+       stats->total_bytes_sent     = server->stats.total_bytes_sent;
+
+       // Add up all the worker's stats
+       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+               worker = server->workers[i];
+               if (!worker)
+                       continue;
+
+               stats->open_connections     += worker->stats.open_connections;
+
+               stats->bytes_received       += worker->stats.bytes_received;
+               stats->total_bytes_received += worker->stats.total_bytes_received;
+               stats->bytes_sent           += worker->stats.bytes_sent;
+               stats->total_bytes_sent     += worker->stats.total_bytes_sent;
+
+               // Reset
+               worker->stats.bytes_received = 0;
+               worker->stats.bytes_sent     = 0;
+       }
+
+       return 0;
+}
+
 static int enable_keepalive(struct fireperf_config* conf, int fd) {
        // Enable keepalive
        int flags = 1;
@@ -151,8 +407,10 @@ ERROR:
        return -1;
 }
 
-static int accept_connection(struct fireperf_config* conf, int sockfd) {
-       struct sockaddr_in6 addr;
+static int accept_connection(struct fireperf_server* server, int sockfd) {
+       struct sockaddr_in6 addr = {};
+       int r;
+
        socklen_t l = sizeof(addr);
 
        int fd = -1;
@@ -163,13 +421,38 @@ static int accept_connection(struct fireperf_config* conf, int sockfd) {
        } while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
 
        if (fd < 0) {
-               ERROR(conf, "Could not accept a new connection: %s\n", strerror(errno));
+               ERROR(server->conf, "Could not accept a new connection: %s\n", strerror(errno));
                return -1;
        }
 
-       DEBUG(conf, "New connection accepted on socket %d\n", fd);
+       DEBUG(server->conf, "New connection accepted on socket %d\n", fd);
+
+       // A connection has been opened
+       server->stats.open_connections++;
+       server->stats.connections++;
+
+       // Close connections immediately when -x is set
+       if (server->conf->close) {
+               DEBUG(server->conf, "Closing connection %d\n", fd);
+               r = 0;
+
+               goto ERROR;
+       }
+
+       // Create a new worker
+       r = create_worker(server, fd);
+       if (r < 0) {
+               ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+               goto ERROR;
+       }
 
        return fd;
+
+ERROR:
+       if (fd >= 0)
+               close(fd);
+
+       return r;
 }
 
 static int is_listening_socket(struct fireperf_config* conf, int* sockets, int fd) {
@@ -181,13 +464,18 @@ static int is_listening_socket(struct fireperf_config* conf, int* sockets, int f
        return 0;
 }
 
-int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats,
-               int epollfd, int timerfd) {
+int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd) {
+       struct fireperf_server server = {
+               .conf = conf,
+       };
+       struct fireperf_stats stats = {};
+       uint64_t expirations = 0;
+       int r;
+
        DEBUG(conf, "Launching " PACKAGE_NAME " in server mode\n");
 
        int listening_sockets[conf->listening_sockets];
 
-       int r = 1;
        struct epoll_event ev;
        struct epoll_event events[EPOLL_MAX_EVENTS];
 
@@ -228,30 +516,12 @@ int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats,
 
                        // The listening socket
                        if (is_listening_socket(conf, listening_sockets, fd)) {
-                               int connfd = accept_connection(conf, fd);
-                               if (connfd < 0)
+                               r = accept_connection(&server, fd);
+                               if (r < 0)
                                        goto ERROR;
 
-                               // Add the new socket to epoll()
-                               ev.data.fd = connfd;
-                               ev.events  = EPOLLIN|EPOLLRDHUP;
-                               if (!conf->keepalive_only)
-                                       ev.events |= EPOLLOUT;
-
-                               if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev)) {
-                                       ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n",
-                                               strerror(errno));
-                                       goto ERROR;
-                               }
-
-                               // A connection has been opened
-                               stats->open_connections++;
-                               stats->connections++;
-
                        // Handle timer events
                        } else if (fd == timerfd) {
-                               uint64_t expirations;
-
                                // Read from the timer to disarm it
                                ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
                                if (bytes_read <= 0) {
@@ -259,53 +529,20 @@ int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats,
                                        goto ERROR;
                                }
 
-                               r = fireperf_dump_stats(conf, stats, FIREPERF_MODE_SERVER);
+                               // Free any terminated workers
+                               r = free_terminated_workers(&server);
                                if (r)
                                        goto ERROR;
 
-                       // Handle any connection events
-                       } else {
-                               if (events[i].events & EPOLLRDHUP) {
-                                       DEBUG(conf, "Connection %d has closed\n", fd);
-
-                                       // Remove the file descriptor from epoll()
-                                       if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) {
-                                               ERROR(conf, "Could not remove socket file descriptfor from epoll(): %s\n",
-                                                       strerror(errno));
-                                       }
-
-                                       // Free up any resources
-                                       close(fd);
-
-                                       // This connection is now closed
-                                       stats->open_connections--;
-
-                                       // Skip processing anything else, because it would be pointless
-                                       continue;
-                               }
-
-                               // Close connections immediately when -x is set
-                               if (conf->close) {
-                                       DEBUG(conf, "Closing connection %d\n", fd);
-                                       close(fd);
-
-                                       stats->open_connections--;
-                                       continue;
-                               }
-
-                               // Handle incoming data
-                               if (events[i].events & EPOLLIN) {
-                                       r = handle_connection_recv(conf, stats, fd);
-                                       if (r < 0)
-                                               goto ERROR;
-                               }
+                               // Cumulate stats
+                               r = cumulate_stats(&stats, &server);
+                               if (r)
+                                       goto ERROR;
 
-                               // Handle outgoing data
-                               if (events[i].events & EPOLLOUT) {
-                                       r = handle_connection_send(conf, stats, fd);
-                                       if (r < 0)
-                                               goto ERROR;
-                               }
+                               // Print the stats
+                               r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER);
+                               if (r)
+                                       goto ERROR;
                        }
                }
        }
@@ -316,5 +553,18 @@ ERROR:
                        close(listening_sockets[i]);
        }
 
+       // Wait for all workers to terminate
+       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+               if (server.workers[i]) {
+                       // Wait for the worker to terminate
+                       join_worker(server.workers[i]);
+
+                       // Free the worker
+                       free_worker(server.workers[i]);
+
+                       server.workers[i] = NULL;
+               }
+       }
+
        return r;
 }
index cd400901261b2ac08637a4d3a2770d9f2ed33444..b6984c0542568bd9f7eb09eddd02e388377db8f0 100644 (file)
@@ -23,7 +23,6 @@
 
 #include "main.h"
 
-int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats,
-       int epollfd, int timerfd);
+int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd);
 
 #endif /* FIREPERF_SERVER_H */