From: Michael Tremer Date: Thu, 19 Sep 2024 08:59:48 +0000 (+0000) Subject: server: Move creating workers into the context X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=f0722105a3c247e3a8420a95fe6d9c4a06275256;p=fireperf.git server: Move creating workers into the context Signed-off-by: Michael Tremer --- diff --git a/src/ctx.c b/src/ctx.c index 69f8892..32a3add 100644 --- a/src/ctx.c +++ b/src/ctx.c @@ -25,9 +25,12 @@ #include #include #include +#include #include #include "ctx.h" +#include "logging.h" +#include "main.h" #include "random.h" static int parse_address(const char* string, struct in6_addr* address6) { @@ -281,3 +284,73 @@ ERROR: void fireperf_ctx_free(struct fireperf_ctx* ctx) { free(ctx); } + +/* + This function returns a free worker +*/ +struct fireperf_worker* fireperf_ctx_fetch_worker(struct fireperf_ctx* ctx) { + struct fireperf_worker* worker = NULL; + int r; + + /* + XXX TODO + To keep things simple, we create a new worker for each call. + + This should be replaced by a method that searches for the worker with + the least amount of connections and start up to as many workers as we + have CPU cores. + */ + + // Create a new worker + r = fireperf_worker_create(&worker, ctx); + if (r < 0) { + ERROR(ctx, "Could not create worker: %s\n", strerror(-r)); + goto ERROR; + } + + // Launch the worker + r = fireperf_worker_launch(worker); + if (r < 0) { + ERROR(ctx, "Could not launch worker: %s\n", strerror(-r)); + goto ERROR; + } + + // Store a reference + ctx->workers[ctx->num_workers++] = worker; + + return worker; + +ERROR: + if (worker) + fireperf_worker_free(worker); + + return NULL; +} + +struct fireperf_stats fireperf_ctx_get_stats(struct fireperf_ctx* ctx) { + const struct fireperf_stats* s = NULL; + + struct fireperf_stats stats = {}; + + // Add up everything from all workers + for (unsigned int i = 0; i < MAX_WORKERS; i++) { + if (!ctx->workers[i]) + continue; + + // Fetch stats + s = fireperf_worker_get_stats(ctx->workers[i]); + if (!s) + continue; + + // Sum up all open connections + stats.open_connections += s->open_connections; + + // Sum up the transferred data + stats.bytes_received += s->bytes_received; + stats.total_bytes_received += s->total_bytes_received; + stats.bytes_sent += s->bytes_sent; + stats.total_bytes_sent += s->total_bytes_sent; + } + + return stats; +} diff --git a/src/ctx.h b/src/ctx.h index 28a2546..23a8d13 100644 --- a/src/ctx.h +++ b/src/ctx.h @@ -52,12 +52,18 @@ struct fireperf_ctx { char pool[DEFAULT_RANDOM_POOL_SIZE]; // Workers - struct fireperf_workers* workers[MAX_WORKERS]; + struct fireperf_worker* workers[MAX_WORKERS]; unsigned int num_workers; unsigned int max_workers; }; +#include "main.h" +#include "worker.h" + int fireperf_ctx_create(struct fireperf_ctx** ctx, int argc, char* argv[]); void fireperf_ctx_free(struct fireperf_ctx* ctx); +struct fireperf_worker* fireperf_ctx_fetch_worker(struct fireperf_ctx* ctx); +struct fireperf_stats fireperf_ctx_get_stats(struct fireperf_ctx* ctx); + #endif /* FIREPERF_CTX_H */ diff --git a/src/server.c b/src/server.c index bab80f6..8136961 100644 --- a/src/server.c +++ b/src/server.c @@ -41,42 +41,8 @@ struct fireperf_server { // Count all connections unsigned int connections; - - // Workers - struct fireperf_worker* workers[MAX_WORKERS]; - unsigned int num_workers; }; -static struct fireperf_stats get_stats(struct fireperf_server* server) { - const struct fireperf_stats* s = NULL; - - struct fireperf_stats stats = { - .connections = server->connections, - }; - - // Add up everything from all workers - for (unsigned int i = 0; i < MAX_WORKERS; i++) { - if (!server->workers[i]) - continue; - - // Fetch stats - s = fireperf_worker_get_stats(server->workers[i]); - if (!s) - continue; - - // Sum up all open connections - stats.open_connections += s->open_connections; - - // Sum up the transferred data - stats.bytes_received += s->bytes_received; - stats.total_bytes_received += s->total_bytes_received; - stats.bytes_sent += s->bytes_sent; - stats.total_bytes_sent += s->total_bytes_sent; - } - - return stats; -} - static int enable_keepalive(struct fireperf_ctx* ctx, int fd) { // Enable keepalive int flags = 1; @@ -194,48 +160,6 @@ ERROR: return -1; } -/* - This function selects the worker that is used for the an incoming connection -*/ -static struct fireperf_worker* fetch_worker(struct fireperf_server* server) { - struct fireperf_worker* worker = NULL; - int r; - - /* - XXX TODO - To keep things simple, we create a new worker for each call. - - This should be replaced by a method that searches for the worker with - the least amount of connections and start up to as many workers as we - have CPU cores. - */ - - // Create a new worker - r = fireperf_worker_create(&worker, server->ctx); - if (r < 0) { - ERROR(server->ctx, "Could not create worker: %s\n", strerror(-r)); - goto ERROR; - } - - // Launch the worker - r = fireperf_worker_launch(worker); - if (r < 0) { - ERROR(server->ctx, "Could not launch worker: %s\n", strerror(-r)); - goto ERROR; - } - - // Store a reference - server->workers[server->num_workers++] = worker; - - return worker; - -ERROR: - if (worker) - fireperf_worker_free(worker); - - return NULL; -} - static int accept_connection(struct fireperf_server* server, int sockfd) { struct fireperf_worker* worker = NULL; struct sockaddr_in6 addr = {}; @@ -268,7 +192,7 @@ static int accept_connection(struct fireperf_server* server, int sockfd) { } // Find a worker to delegate this connection to - worker = fetch_worker(server); + worker = fireperf_ctx_fetch_worker(server->ctx); // Close the connection if we could not find a worker if (!worker) { @@ -371,7 +295,7 @@ int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd) { } // Cumulate stats - stats = get_stats(&server); + stats = fireperf_ctx_get_stats(ctx); // Print the stats r = fireperf_dump_stats(ctx, &stats, FIREPERF_MODE_SERVER);