]> git.ipfire.org Git - fireperf.git/commitdiff
server: Use the new worker
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 07:50:56 +0000 (07:50 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 07:50:56 +0000 (07:50 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/server.c
src/worker.c
src/worker.h

index 85316dd71c8e8a77f3e95ccc036c36ecb1b6a27b..c9d6c1cc9439310d562278be287373130584b682 100644 (file)
@@ -20,7 +20,6 @@
 
 #include <errno.h>
 #include <netinet/tcp.h>
-#include <pthread.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
 #include "main.h"
 #include "server.h"
 #include "util.h"
+#include "worker.h"
 
 #define MAX_WORKERS       128
 #define SOCKET_BACKLOG   1024
 
-struct fireperf_worker {
-       // Thread
-       pthread_t thread;
-
-       // Configuration
-       struct fireperf_config* conf;
-
-       // Collect stats
-       struct fireperf_stats stats;
-
-       // Client Socket
-       int client;
-
-       // Epoll
-       int epollfd;
-
-       // Return code
-       int r;
-};
-
 struct fireperf_server {
        // Configuration
        struct fireperf_config* conf;
 
-       // Collect stats
-       struct fireperf_stats stats;
+       // Count all connections
+       unsigned int connections;
 
        // Workers
        struct fireperf_worker* workers[MAX_WORKERS];
        unsigned int num_workers;
 };
 
-static int is_worker_alive(struct fireperf_worker* worker) {
-       return pthread_tryjoin_np(worker->thread, NULL);
-}
-
-static int join_worker(struct fireperf_worker* worker) {
-       int r;
-
-       // Join the worker
-       r = pthread_join(worker->thread, NULL);
-       if (r) {
-               ERROR(worker->conf, "Could not join worker: %s\n", strerror(errno));
-               return -errno;
-       }
-
-       return 0;
-}
+static struct fireperf_stats get_stats(struct fireperf_server* server) {
+       const struct fireperf_stats* s = NULL;
 
-static void free_worker(struct fireperf_worker* worker) {
-       free(worker);
-}
-
-static int free_terminated_workers(struct fireperf_server* server) {
-       int r;
-
-       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
-               struct fireperf_worker* worker = server->workers[i];
-
-               // Skip any empty slots
-               if (!worker)
-                       continue;
-
-               // Skip any workers that are still alive
-               if (is_worker_alive(worker))
-                       continue;
-
-               // Free worker
-               free_worker(worker);
-
-               // Clear the slot
-               server->workers[i] = NULL;
-       }
-}
-
-static void* run_worker(void* w) {
-       struct fireperf_worker* worker = w;
-       int ready;
-       int fd = -1;
-       int r;
-
-       struct epoll_event events[EPOLL_MAX_EVENTS];
-
-       DEBUG(worker->conf, "New worker launched as %lu...\n", pthread_self());
-
-       // We have exactly one open connection
-       worker->stats.open_connections = 1;
-
-       worker->epollfd = -1;
-
-       // Initialize epoll()
-       worker->epollfd = epoll_create1(0);
-       if (worker->epollfd < 0) {
-               ERROR(worker->conf, "Could not initialize epoll(): %s\n", strerror(errno));
-               r = -errno;
-               goto ERROR;
-       }
-
-       struct epoll_event ev = {
-               .events  = EPOLLIN|EPOLLRDHUP,
-               .data.fd = worker->client,
+       struct fireperf_stats stats = {
+               .connections = server->connections,
        };
 
-       if (!worker->conf->keepalive_only)
-               ev.events |= EPOLLOUT;
-
-       // Add the socket to the loop
-       r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, worker->client, &ev);
-       if (r) {
-               ERROR(worker->conf, "Could not add the socket to epoll(): %s\n", strerror(errno));
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Enter the main loop...
-       for (;;) {
-               ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
-               if (ready < 1) {
-                       // We terminate gracefully when we receive a signal
-                       if (errno == EINTR)
-                               break;
-
-                       ERROR(worker->conf, "epoll_wait() failed: %s\n", strerror(errno));
-                       r = -errno;
-                       goto ERROR;
-               }
-
-               // Loop through all sockets that are ready...
-               for (int i = 0; i < ready; i++) {
-                       fd = events[i].data.fd;
-
-                       // Handle the client socket
-                       if (worker->client == fd) {
-                               if (events[i].events & EPOLLRDHUP) {
-                                       DEBUG(worker->conf, "Connection %d has closed\n", fd);
-
-                                       // There is no point in continuing any further
-                                       // after the client went away
-                                       goto ERROR;
-                               }
-
-                               // Handle incoming data
-                               if (events[i].events & EPOLLIN) {
-                                       r = handle_connection_recv(worker->conf, &worker->stats, fd);
-                                       if (r < 0)
-                                               goto ERROR;
-                               }
-
-                               // Handle outgoing data
-                               if (events[i].events & EPOLLOUT) {
-                                       r = handle_connection_send(worker->conf, &worker->stats, fd);
-                                       if (r < 0)
-                                               goto ERROR;
-                               }
-                       }
-               }
-       }
-
-ERROR:
-       if (worker->epollfd >= 0)
-               close(worker->epollfd);
-       if (worker->client >= 0)
-               close(worker->client);
-
-       // Store the return code
-       worker->r = r;
-
-       return &worker->r;
-}
-
-static int create_worker(struct fireperf_server* server, const int fd) {
-       struct fireperf_worker* worker = NULL;
-       int r;
-
-       DEBUG(server->conf, "Creating a new worker for %d\n", fd);
-
-       // Check if we have space for a new worker
-       if (server->num_workers >= MAX_WORKERS) {
-               ERROR(server->conf, "Maximum number of workers reached. Closing connection.\n");
-               return -ENOSPC;
-       }
-
-       // Allocate a new worker
-       worker = calloc(1, sizeof(*worker));
-       if (!worker)
-               return -errno;
-
-       // Store a reference to the configuration
-       worker->conf = server->conf;
-
-       // Store the client socket
-       worker->client = fd;
-
-       // Store the worker
+       // Add up everything from all workers
        for (unsigned int i = 0; i < MAX_WORKERS; i++) {
-               if (server->workers[i])
+               if (!server->workers[i])
                        continue;
 
-               // Store a reference in the first free slot
-               server->workers[i] = worker;
-               break;
-       }
-
-       // We now have more workers
-       ++server->num_workers;
-
-       // Run the worker
-       r = pthread_create(&worker->thread, NULL, run_worker, worker);
-       if (r) {
-               ERROR(server->conf, "Could not launch thread: %s\n", strerror(r));
-               return -r;
-       }
-
-       return 0;
-}
-
-static int cumulate_stats(struct fireperf_stats* stats, struct fireperf_server* server) {
-       struct fireperf_worker* worker = NULL;
-
-       // Reset open connections
-       stats->open_connections = 0;
-
-       // Fetch the total number of connections
-       stats->connections = server->stats.connections;
-
-       // Reset what has been sent in the last period
-       stats->bytes_received       = 0;
-       stats->bytes_sent           = 0;
-
-       // Reset the stats to the server's stats
-       stats->total_bytes_received = server->stats.total_bytes_received;
-       stats->total_bytes_sent     = server->stats.total_bytes_sent;
-
-       // Add up all the worker's stats
-       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
-               worker = server->workers[i];
-               if (!worker)
+               // Fetch stats
+               s = fireperf_worker_get_stats(server->workers[i]);
+               if (!s)
                        continue;
 
-               stats->open_connections     += worker->stats.open_connections;
+               // Sum up all open connections
+               stats.open_connections += s->open_connections;
 
-               stats->bytes_received       += worker->stats.bytes_received;
-               stats->total_bytes_received += worker->stats.total_bytes_received;
-               stats->bytes_sent           += worker->stats.bytes_sent;
-               stats->total_bytes_sent     += worker->stats.total_bytes_sent;
-
-               // Reset
-               worker->stats.bytes_received = 0;
-               worker->stats.bytes_sent     = 0;
+               // Sum up the transferred data
+               stats.bytes_received       += s->bytes_received;
+               stats.total_bytes_received += s->total_bytes_received;
+               stats.bytes_sent           += s->bytes_sent;
+               stats.total_bytes_sent     += s->total_bytes_sent;
        }
 
-       return 0;
+       return stats;
 }
 
 static int enable_keepalive(struct fireperf_config* conf, int fd) {
@@ -407,7 +195,50 @@ ERROR:
        return -1;
 }
 
+/*
+       This function selects the worker that is used for the an incoming connection
+*/
+static struct fireperf_worker* fetch_worker(struct fireperf_server* server) {
+       struct fireperf_worker* worker = NULL;
+       int r;
+
+       /*
+               XXX TODO
+               To keep things simple, we create a new worker for each call.
+
+               This should be replaced by a method that searches for the worker with
+               the least amount of connections and start up to as many workers as we
+               have CPU cores.
+       */
+
+       // Create a new worker
+       r = fireperf_worker_create(&worker, server->conf);
+       if (r < 0) {
+               ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+               goto ERROR;
+       }
+
+       // Launch the worker
+       r = fireperf_worker_launch(worker);
+       if (r < 0) {
+               ERROR(server->conf, "Could not launch worker: %s\n", strerror(-r));
+               goto ERROR;
+       }
+
+       // Store a reference
+       server->workers[server->num_workers++] = worker;
+
+       return worker;
+
+ERROR:
+       if (worker)
+               fireperf_worker_free(worker);
+
+       return NULL;
+}
+
 static int accept_connection(struct fireperf_server* server, int sockfd) {
+       struct fireperf_worker* worker = NULL;
        struct sockaddr_in6 addr = {};
        int r;
 
@@ -428,29 +259,40 @@ static int accept_connection(struct fireperf_server* server, int sockfd) {
        DEBUG(server->conf, "New connection accepted on socket %d\n", fd);
 
        // A connection has been opened
-       server->stats.open_connections++;
-       server->stats.connections++;
+       server->connections++;
 
        // Close connections immediately when -x is set
        if (server->conf->close) {
-               DEBUG(server->conf, "Closing connection %d\n", fd);
                r = 0;
 
                goto ERROR;
        }
 
-       // Create a new worker
-       r = create_worker(server, fd);
+       // Find a worker to delegate this connection to
+       worker = fetch_worker(server);
+
+       // Close the connection if we could not find a worker
+       if (!worker) {
+               ERROR(server->conf, "Could not find a worker that could handle a new connection\n");
+               r = -EBUSY;
+               goto ERROR;
+       }
+
+       // Delegate the connection
+       r = fireperf_worker_delegate(worker, fd);
        if (r < 0) {
-               ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+               ERROR(server->conf, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
                goto ERROR;
        }
 
-       return fd;
+       return 0;
 
 ERROR:
-       if (fd >= 0)
+       if (fd >= 0) {
+               DEBUG(server->conf, "Closing connection %d\n", fd);
+
                close(fd);
+       }
 
        return r;
 }
@@ -529,15 +371,8 @@ int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd) {
                                        goto ERROR;
                                }
 
-                               // Free any terminated workers
-                               r = free_terminated_workers(&server);
-                               if (r)
-                                       goto ERROR;
-
                                // Cumulate stats
-                               r = cumulate_stats(&stats, &server);
-                               if (r)
-                                       goto ERROR;
+                               stats = get_stats(&server);
 
                                // Print the stats
                                r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER);
@@ -553,18 +388,5 @@ ERROR:
                        close(listening_sockets[i]);
        }
 
-       // Wait for all workers to terminate
-       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
-               if (server.workers[i]) {
-                       // Wait for the worker to terminate
-                       join_worker(server.workers[i]);
-
-                       // Free the worker
-                       free_worker(server.workers[i]);
-
-                       server.workers[i] = NULL;
-               }
-       }
-
        return r;
 }
index 2eef429f284e2f9cdb3e8dd4c44341c2dfa27f40..b51216ad522874c88806827ed237627c15b81f03 100644 (file)
@@ -34,16 +34,15 @@ struct fireperf_worker {
        // Configuration
        struct fireperf_config* conf;
 
+       // Collect stats
+       struct fireperf_stats stats;
+
        // Epoll
        int epollfd;
-};
-
-static void fireperf_worker_free(struct fireperf_worker* worker) {
-       if (worker->epollfd >= 0)
-               close(worker->epollfd);
 
-       free(worker);
-}
+       // Exit Code
+       int r;
+};
 
 int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf) {
        struct fireperf_worker* w = NULL;
@@ -79,15 +78,87 @@ ERROR:
        return r;
 }
 
+void fireperf_worker_free(struct fireperf_worker* worker) {
+       if (worker->epollfd >= 0)
+               close(worker->epollfd);
+
+       free(worker);
+}
+
 /*
        The main function of the worker.
 */
 static void* fireperf_worker_main(void* w) {
        struct fireperf_worker* worker = w;
+       int ready = 0;
+       int r;
 
        DEBUG(worker->conf, "New worker launched as %lu\n", pthread_self());
 
-       return 0;
+       struct epoll_event events[EPOLL_MAX_EVENTS];
+
+       // Enter the main loop...
+       for (;;) {
+               ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
+               if (ready < 1) {
+                       // We terminate gracefully when we receive a signal
+                       if (errno == EINTR)
+                               break;
+
+                       ERROR(worker->conf, "epoll_wait() failed: %m\n");
+                       r = -errno;
+                       goto ERROR;
+               }
+
+               // Loop through all sockets that are ready...
+               for (int i = 0; i < ready; i++) {
+                       int fd = events[i].data.fd;
+
+                       // Handle closed connections
+                       if (events[i].events & EPOLLRDHUP) {
+                               DEBUG(worker->conf, "Connection %d has closed\n", fd);
+
+                               // We now have one fewer connections
+                               worker->stats.open_connections--;
+
+                               // Remove the file descriptor from epoll()
+                               r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL);
+                               if (r) {
+                                       ERROR(worker->conf, "Could not remove socket file descriptfor from epoll(): %m\n");
+                                       r = -errno;
+                                       goto ERROR;
+                               }
+
+                               // Free up any resources
+                               close(fd);
+
+                               // There is no point in continuing any further after the connection closed
+                               break;
+                       }
+
+                       // Handle incoming data
+                       if (events[i].events & EPOLLIN) {
+                               r = handle_connection_recv(worker->conf, &worker->stats, fd);
+                               if (r < 0)
+                                       goto ERROR;
+                       }
+
+                       // Handle outgoing data
+                       if (events[i].events & EPOLLOUT) {
+                               r = handle_connection_send(worker->conf, &worker->stats, fd);
+                               if (r < 0)
+                                       goto ERROR;
+                       }
+               }
+       }
+
+       DEBUG(worker->conf, "Worker has gracefully terminated\n");
+
+ERROR:
+       // Store the return code
+       worker->r = r;
+
+       return &worker->r;
 }
 
 int fireperf_worker_launch(struct fireperf_worker* worker) {
@@ -102,6 +173,41 @@ int fireperf_worker_launch(struct fireperf_worker* worker) {
        return r;
 }
 
+int fireperf_worker_terminate(struct fireperf_worker* worker) {
+       // If the worker has terminated, we return its exit code
+       if (!fireperf_worker_is_alive(worker))
+               return worker->r;
+
+       // XXX Otherwise we need to do something else
+       return 0;
+}
+
 int fireperf_worker_is_alive(struct fireperf_worker* worker) {
        return pthread_tryjoin_np(worker->thread, NULL);
 }
+
+int fireperf_worker_delegate(struct fireperf_worker* worker, int socket) {
+       int r;
+
+       struct epoll_event ev = {
+               .events  = EPOLLIN|EPOLLRDHUP,
+               .data.fd = socket,
+       };
+
+       if (!worker->conf->keepalive_only)
+               ev.events |= EPOLLOUT;
+
+       // Add the socket to the loop
+       r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
+       if (r)
+               ERROR(worker->conf, "Could not add the socket to epoll(): %m\n");
+
+       // We now have one more open connections
+       worker->stats.open_connections++;
+
+       return r;
+}
+
+const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker) {
+       return &worker->stats;
+}
index e7d182b2949b40047ba35b3ad6e3db7f988be5e3..fb962c65b307a94f84e9ea4ca243fcd0d85b7c80 100644 (file)
@@ -26,8 +26,14 @@ struct fireperf_worker;
 #include "main.h"
 
 int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf);
+void fireperf_worker_free(struct fireperf_worker* worker);
 
 int fireperf_worker_launch(struct fireperf_worker* worker);
+int fireperf_worker_terminate(struct fireperf_worker* worker);
 int fireperf_worker_is_alive(struct fireperf_worker* worker);
 
+int fireperf_worker_delegate(struct fireperf_worker* worker, int socket);
+
+const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker);
+
 #endif /* FIREPERF_WORKER_H */