From: Michael Tremer Date: Thu, 19 Sep 2024 07:50:56 +0000 (+0000) Subject: server: Use the new worker X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=edd32873afd625e3b6997105a21df3c8905e8458;p=fireperf.git server: Use the new worker Signed-off-by: Michael Tremer --- diff --git a/src/server.c b/src/server.c index 85316dd..c9d6c1c 100644 --- a/src/server.c +++ b/src/server.c @@ -20,7 +20,6 @@ #include #include -#include #include #include #include @@ -32,262 +31,51 @@ #include "main.h" #include "server.h" #include "util.h" +#include "worker.h" #define MAX_WORKERS 128 #define SOCKET_BACKLOG 1024 -struct fireperf_worker { - // Thread - pthread_t thread; - - // Configuration - struct fireperf_config* conf; - - // Collect stats - struct fireperf_stats stats; - - // Client Socket - int client; - - // Epoll - int epollfd; - - // Return code - int r; -}; - struct fireperf_server { // Configuration struct fireperf_config* conf; - // Collect stats - struct fireperf_stats stats; + // Count all connections + unsigned int connections; // Workers struct fireperf_worker* workers[MAX_WORKERS]; unsigned int num_workers; }; -static int is_worker_alive(struct fireperf_worker* worker) { - return pthread_tryjoin_np(worker->thread, NULL); -} - -static int join_worker(struct fireperf_worker* worker) { - int r; - - // Join the worker - r = pthread_join(worker->thread, NULL); - if (r) { - ERROR(worker->conf, "Could not join worker: %s\n", strerror(errno)); - return -errno; - } - - return 0; -} +static struct fireperf_stats get_stats(struct fireperf_server* server) { + const struct fireperf_stats* s = NULL; -static void free_worker(struct fireperf_worker* worker) { - free(worker); -} - -static int free_terminated_workers(struct fireperf_server* server) { - int r; - - for (unsigned int i = 0; i < MAX_WORKERS; i++) { - struct fireperf_worker* worker = server->workers[i]; - - // Skip any empty slots - if (!worker) - continue; - - // Skip any workers that are still alive - if (is_worker_alive(worker)) - continue; - - // Free worker - free_worker(worker); - - // Clear the slot - server->workers[i] = NULL; - } -} - -static void* run_worker(void* w) { - struct fireperf_worker* worker = w; - int ready; - int fd = -1; - int r; - - struct epoll_event events[EPOLL_MAX_EVENTS]; - - DEBUG(worker->conf, "New worker launched as %lu...\n", pthread_self()); - - // We have exactly one open connection - worker->stats.open_connections = 1; - - worker->epollfd = -1; - - // Initialize epoll() - worker->epollfd = epoll_create1(0); - if (worker->epollfd < 0) { - ERROR(worker->conf, "Could not initialize epoll(): %s\n", strerror(errno)); - r = -errno; - goto ERROR; - } - - struct epoll_event ev = { - .events = EPOLLIN|EPOLLRDHUP, - .data.fd = worker->client, + struct fireperf_stats stats = { + .connections = server->connections, }; - if (!worker->conf->keepalive_only) - ev.events |= EPOLLOUT; - - // Add the socket to the loop - r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, worker->client, &ev); - if (r) { - ERROR(worker->conf, "Could not add the socket to epoll(): %s\n", strerror(errno)); - r = -errno; - goto ERROR; - } - - // Enter the main loop... - for (;;) { - ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1); - if (ready < 1) { - // We terminate gracefully when we receive a signal - if (errno == EINTR) - break; - - ERROR(worker->conf, "epoll_wait() failed: %s\n", strerror(errno)); - r = -errno; - goto ERROR; - } - - // Loop through all sockets that are ready... - for (int i = 0; i < ready; i++) { - fd = events[i].data.fd; - - // Handle the client socket - if (worker->client == fd) { - if (events[i].events & EPOLLRDHUP) { - DEBUG(worker->conf, "Connection %d has closed\n", fd); - - // There is no point in continuing any further - // after the client went away - goto ERROR; - } - - // Handle incoming data - if (events[i].events & EPOLLIN) { - r = handle_connection_recv(worker->conf, &worker->stats, fd); - if (r < 0) - goto ERROR; - } - - // Handle outgoing data - if (events[i].events & EPOLLOUT) { - r = handle_connection_send(worker->conf, &worker->stats, fd); - if (r < 0) - goto ERROR; - } - } - } - } - -ERROR: - if (worker->epollfd >= 0) - close(worker->epollfd); - if (worker->client >= 0) - close(worker->client); - - // Store the return code - worker->r = r; - - return &worker->r; -} - -static int create_worker(struct fireperf_server* server, const int fd) { - struct fireperf_worker* worker = NULL; - int r; - - DEBUG(server->conf, "Creating a new worker for %d\n", fd); - - // Check if we have space for a new worker - if (server->num_workers >= MAX_WORKERS) { - ERROR(server->conf, "Maximum number of workers reached. Closing connection.\n"); - return -ENOSPC; - } - - // Allocate a new worker - worker = calloc(1, sizeof(*worker)); - if (!worker) - return -errno; - - // Store a reference to the configuration - worker->conf = server->conf; - - // Store the client socket - worker->client = fd; - - // Store the worker + // Add up everything from all workers for (unsigned int i = 0; i < MAX_WORKERS; i++) { - if (server->workers[i]) + if (!server->workers[i]) continue; - // Store a reference in the first free slot - server->workers[i] = worker; - break; - } - - // We now have more workers - ++server->num_workers; - - // Run the worker - r = pthread_create(&worker->thread, NULL, run_worker, worker); - if (r) { - ERROR(server->conf, "Could not launch thread: %s\n", strerror(r)); - return -r; - } - - return 0; -} - -static int cumulate_stats(struct fireperf_stats* stats, struct fireperf_server* server) { - struct fireperf_worker* worker = NULL; - - // Reset open connections - stats->open_connections = 0; - - // Fetch the total number of connections - stats->connections = server->stats.connections; - - // Reset what has been sent in the last period - stats->bytes_received = 0; - stats->bytes_sent = 0; - - // Reset the stats to the server's stats - stats->total_bytes_received = server->stats.total_bytes_received; - stats->total_bytes_sent = server->stats.total_bytes_sent; - - // Add up all the worker's stats - for (unsigned int i = 0; i < MAX_WORKERS; i++) { - worker = server->workers[i]; - if (!worker) + // Fetch stats + s = fireperf_worker_get_stats(server->workers[i]); + if (!s) continue; - stats->open_connections += worker->stats.open_connections; + // Sum up all open connections + stats.open_connections += s->open_connections; - stats->bytes_received += worker->stats.bytes_received; - stats->total_bytes_received += worker->stats.total_bytes_received; - stats->bytes_sent += worker->stats.bytes_sent; - stats->total_bytes_sent += worker->stats.total_bytes_sent; - - // Reset - worker->stats.bytes_received = 0; - worker->stats.bytes_sent = 0; + // Sum up the transferred data + stats.bytes_received += s->bytes_received; + stats.total_bytes_received += s->total_bytes_received; + stats.bytes_sent += s->bytes_sent; + stats.total_bytes_sent += s->total_bytes_sent; } - return 0; + return stats; } static int enable_keepalive(struct fireperf_config* conf, int fd) { @@ -407,7 +195,50 @@ ERROR: return -1; } +/* + This function selects the worker that is used for the an incoming connection +*/ +static struct fireperf_worker* fetch_worker(struct fireperf_server* server) { + struct fireperf_worker* worker = NULL; + int r; + + /* + XXX TODO + To keep things simple, we create a new worker for each call. + + This should be replaced by a method that searches for the worker with + the least amount of connections and start up to as many workers as we + have CPU cores. + */ + + // Create a new worker + r = fireperf_worker_create(&worker, server->conf); + if (r < 0) { + ERROR(server->conf, "Could not create worker: %s\n", strerror(-r)); + goto ERROR; + } + + // Launch the worker + r = fireperf_worker_launch(worker); + if (r < 0) { + ERROR(server->conf, "Could not launch worker: %s\n", strerror(-r)); + goto ERROR; + } + + // Store a reference + server->workers[server->num_workers++] = worker; + + return worker; + +ERROR: + if (worker) + fireperf_worker_free(worker); + + return NULL; +} + static int accept_connection(struct fireperf_server* server, int sockfd) { + struct fireperf_worker* worker = NULL; struct sockaddr_in6 addr = {}; int r; @@ -428,29 +259,40 @@ static int accept_connection(struct fireperf_server* server, int sockfd) { DEBUG(server->conf, "New connection accepted on socket %d\n", fd); // A connection has been opened - server->stats.open_connections++; - server->stats.connections++; + server->connections++; // Close connections immediately when -x is set if (server->conf->close) { - DEBUG(server->conf, "Closing connection %d\n", fd); r = 0; goto ERROR; } - // Create a new worker - r = create_worker(server, fd); + // Find a worker to delegate this connection to + worker = fetch_worker(server); + + // Close the connection if we could not find a worker + if (!worker) { + ERROR(server->conf, "Could not find a worker that could handle a new connection\n"); + r = -EBUSY; + goto ERROR; + } + + // Delegate the connection + r = fireperf_worker_delegate(worker, fd); if (r < 0) { - ERROR(server->conf, "Could not create worker: %s\n", strerror(-r)); + ERROR(server->conf, "Could not delegate a new connection to a worker: %s\n", strerror(-r)); goto ERROR; } - return fd; + return 0; ERROR: - if (fd >= 0) + if (fd >= 0) { + DEBUG(server->conf, "Closing connection %d\n", fd); + close(fd); + } return r; } @@ -529,15 +371,8 @@ int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd) { goto ERROR; } - // Free any terminated workers - r = free_terminated_workers(&server); - if (r) - goto ERROR; - // Cumulate stats - r = cumulate_stats(&stats, &server); - if (r) - goto ERROR; + stats = get_stats(&server); // Print the stats r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER); @@ -553,18 +388,5 @@ ERROR: close(listening_sockets[i]); } - // Wait for all workers to terminate - for (unsigned int i = 0; i < MAX_WORKERS; i++) { - if (server.workers[i]) { - // Wait for the worker to terminate - join_worker(server.workers[i]); - - // Free the worker - free_worker(server.workers[i]); - - server.workers[i] = NULL; - } - } - return r; } diff --git a/src/worker.c b/src/worker.c index 2eef429..b51216a 100644 --- a/src/worker.c +++ b/src/worker.c @@ -34,16 +34,15 @@ struct fireperf_worker { // Configuration struct fireperf_config* conf; + // Collect stats + struct fireperf_stats stats; + // Epoll int epollfd; -}; - -static void fireperf_worker_free(struct fireperf_worker* worker) { - if (worker->epollfd >= 0) - close(worker->epollfd); - free(worker); -} + // Exit Code + int r; +}; int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf) { struct fireperf_worker* w = NULL; @@ -79,15 +78,87 @@ ERROR: return r; } +void fireperf_worker_free(struct fireperf_worker* worker) { + if (worker->epollfd >= 0) + close(worker->epollfd); + + free(worker); +} + /* The main function of the worker. */ static void* fireperf_worker_main(void* w) { struct fireperf_worker* worker = w; + int ready = 0; + int r; DEBUG(worker->conf, "New worker launched as %lu\n", pthread_self()); - return 0; + struct epoll_event events[EPOLL_MAX_EVENTS]; + + // Enter the main loop... + for (;;) { + ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1); + if (ready < 1) { + // We terminate gracefully when we receive a signal + if (errno == EINTR) + break; + + ERROR(worker->conf, "epoll_wait() failed: %m\n"); + r = -errno; + goto ERROR; + } + + // Loop through all sockets that are ready... + for (int i = 0; i < ready; i++) { + int fd = events[i].data.fd; + + // Handle closed connections + if (events[i].events & EPOLLRDHUP) { + DEBUG(worker->conf, "Connection %d has closed\n", fd); + + // We now have one fewer connections + worker->stats.open_connections--; + + // Remove the file descriptor from epoll() + r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL); + if (r) { + ERROR(worker->conf, "Could not remove socket file descriptfor from epoll(): %m\n"); + r = -errno; + goto ERROR; + } + + // Free up any resources + close(fd); + + // There is no point in continuing any further after the connection closed + break; + } + + // Handle incoming data + if (events[i].events & EPOLLIN) { + r = handle_connection_recv(worker->conf, &worker->stats, fd); + if (r < 0) + goto ERROR; + } + + // Handle outgoing data + if (events[i].events & EPOLLOUT) { + r = handle_connection_send(worker->conf, &worker->stats, fd); + if (r < 0) + goto ERROR; + } + } + } + + DEBUG(worker->conf, "Worker has gracefully terminated\n"); + +ERROR: + // Store the return code + worker->r = r; + + return &worker->r; } int fireperf_worker_launch(struct fireperf_worker* worker) { @@ -102,6 +173,41 @@ int fireperf_worker_launch(struct fireperf_worker* worker) { return r; } +int fireperf_worker_terminate(struct fireperf_worker* worker) { + // If the worker has terminated, we return its exit code + if (!fireperf_worker_is_alive(worker)) + return worker->r; + + // XXX Otherwise we need to do something else + return 0; +} + int fireperf_worker_is_alive(struct fireperf_worker* worker) { return pthread_tryjoin_np(worker->thread, NULL); } + +int fireperf_worker_delegate(struct fireperf_worker* worker, int socket) { + int r; + + struct epoll_event ev = { + .events = EPOLLIN|EPOLLRDHUP, + .data.fd = socket, + }; + + if (!worker->conf->keepalive_only) + ev.events |= EPOLLOUT; + + // Add the socket to the loop + r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev); + if (r) + ERROR(worker->conf, "Could not add the socket to epoll(): %m\n"); + + // We now have one more open connections + worker->stats.open_connections++; + + return r; +} + +const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker) { + return &worker->stats; +} diff --git a/src/worker.h b/src/worker.h index e7d182b..fb962c6 100644 --- a/src/worker.h +++ b/src/worker.h @@ -26,8 +26,14 @@ struct fireperf_worker; #include "main.h" int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf); +void fireperf_worker_free(struct fireperf_worker* worker); int fireperf_worker_launch(struct fireperf_worker* worker); +int fireperf_worker_terminate(struct fireperf_worker* worker); int fireperf_worker_is_alive(struct fireperf_worker* worker); +int fireperf_worker_delegate(struct fireperf_worker* worker, int socket); + +const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker); + #endif /* FIREPERF_WORKER_H */