From: Michael Tremer Date: Thu, 19 Sep 2024 10:35:02 +0000 (+0000) Subject: main: Merge client and server to use the workers X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=6a542df2f7044dfbdcf04763a6c4c4fcc05670e7;p=fireperf.git main: Merge client and server to use the workers Signed-off-by: Michael Tremer --- diff --git a/src/client.c b/src/client.c index 35f9c92..e5618b1 100644 --- a/src/client.c +++ b/src/client.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "client.h" @@ -33,23 +34,57 @@ #include "random.h" #include "util.h" -// Set to one when the timeout has expired -static int timeout_expired = 0; +struct fireperf_client_state { + // For the timeout + int timerfd; -static void handle_SIGALRM(int signal) { - switch (signal) { - // Terminate after timeout has expired - case SIGALRM: - timeout_expired = 1; - break; - } + // How many open connections? + unsigned int connections; +}; + +static int setup_timer(struct fireperf_ctx* ctx, int epollfd) { + int timerfd = -1; + int r; + + // Create timerfd() + timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC); + if (timerfd < 0) + return -errno; + + struct epoll_event ev = { + .events = EPOLLIN, + .data.fd = timerfd, + }; + + // Register the timer with the event loop + r = epoll_ctl(epollfd, EPOLL_CTL_ADD, timerfd, &ev); + if (r) + return -errno; + + // Let the timer ping us once a second + struct itimerspec timer = { + .it_value.tv_sec = ctx->timeout, + }; + + // Arm the timer + r = timerfd_settime(timerfd, 0, &timer, NULL); + if (r) + return -errno; + + return timerfd; } -static int open_connection(struct fireperf_ctx* ctx) { +static int open_connection(struct fireperf_ctx* ctx, + struct fireperf_client_state* state, int epollfd) { + struct fireperf_worker* worker = NULL; + int fd; + int r; + // Open a new socket - int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); + fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); if (fd < 0) { - ERROR(ctx, "Could not open socket: %s\n", strerror(errno)); + ERROR(ctx, "Could not open socket: %m\n"); + r = -errno; goto ERROR; } @@ -66,150 +101,130 @@ static int open_connection(struct fireperf_ctx* ctx) { }; // Set socket buffer sizes - int r = set_socket_buffer_sizes(ctx, fd); + r = set_socket_buffer_sizes(ctx, fd); if (r) goto ERROR; // Connect to the server r = connect(fd, &peer, sizeof(peer)); - if (r && (errno != EINPROGRESS)) { - ERROR(ctx, "Could not connect to server: %s\n", strerror(errno)); + if (r) { + switch (errno) { + case EINPROGRESS: + break; + + default: + ERROR(ctx, "Could not connect to server: %m\n"); + r = -errno; + goto ERROR; + } + } + + // Find a worker to delegate this connection to + worker = fireperf_ctx_fetch_worker(ctx); + + // Close the connection if we could not find a worker + if (!worker) { + ERROR(ctx, "Could not find a worker that could handle a new connection\n"); + r = -EBUSY; goto ERROR; } - return fd; + int events = EPOLLIN; + + // Let us know when the socket is ready for receiving data + if (!ctx->keepalive_only) + events |= EPOLLIN; + + // In duplex mode, we send data, too + if (ctx->duplex) + events |= EPOLLOUT; + + // Delegate the connection + r = fireperf_worker_delegate(worker, events, fd); + if (r < 0) { + ERROR(ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r)); + goto ERROR; + } + + // Increment the number of open connections + state->connections++; + + return 0; ERROR: - if (fd > 0) + if (fd >= 0) close(fd); - return -1; + return r; } -int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats, - int epollfd, int timerfd) { - DEBUG(ctx, "Launching " PACKAGE_NAME " in client mode\n"); - - int r = 1; +static int open_connections(struct fireperf_ctx* ctx, + struct fireperf_client_state* state, int epollfd) { + int r = 0; - struct epoll_event ev = { - .events = EPOLLIN, - }; - struct epoll_event events[EPOLL_MAX_EVENTS]; + // If we don't have enough open connections, we try to open more + while (state->connections < ctx->parallel) { + r = open_connection(ctx, state, epollfd); + if (r) + break; + } - // Let us know when the socket is ready for receiving data - if (!ctx->keepalive_only) - ev.events |= EPOLLIN; + return r; +} - // In duplex mode, we send data, too - if (ctx->duplex) - ev.events |= EPOLLOUT; +int fireperf_client_init(struct fireperf_ctx* ctx, void** data, int epollfd) { + struct fireperf_client_state* state = NULL; + int r; - DEBUG(ctx, "Opening %lu connections...\n", ctx->parallel); + // Allocate state + state = calloc(1, sizeof(*state)); + if (!state) + return -errno; - // ctxigure timeout if set + // Setup a timer for the timeout if (ctx->timeout) { - // Register signal handler - signal(SIGALRM, handle_SIGALRM); - - alarm(ctx->timeout); + state->timerfd = setup_timer(ctx, epollfd); + if (state->timerfd < 0) + return -state->timerfd; + } else { + state->timerfd = -1; } - DEBUG(ctx, "Entering main loop...\n"); + // Open some initial connections + r = open_connections(ctx, state, epollfd); + if (r) + goto ERROR; - while (!ctx->terminated && !timeout_expired) { - // Open connections - while (stats->open_connections < ctx->parallel) { - int fd = open_connection(ctx); - if (fd < 0) - continue; + // Return the state + *data = state; - ev.data.fd = fd; + return 0; - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) { - ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n", - strerror(errno)); - goto ERROR; - } +ERROR: + if (state) + fireperf_client_free(ctx, state); - stats->open_connections++; - stats->connections++; - } + return r; +} - int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1); - if (fds < 1) { - // We terminate gracefully when we receive a signal - if (errno == EINTR) - break; +int fireperf_client_handle(struct fireperf_ctx* ctx, void* data, int fd) { + struct fireperf_client_state* state = data; - ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno)); - goto ERROR; - } + // Check if the timeout has fired + if (state->timerfd == fd) { + DEBUG(ctx, "Timeout reached\n"); - for (int i = 0; i < fds; i++) { - int fd = events[i].data.fd; - - // What type of event are we handling? - - // Handle timer events - if (fd == timerfd) { - uint64_t expirations; - - // Read from the timer to disarm it - ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations)); - if (bytes_read <= 0) { - ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno)); - goto ERROR; - } - - r = fireperf_dump_stats(ctx, stats, FIREPERF_MODE_CLIENT); - if (r) - goto ERROR; - - // Handle connection sockets - } else { - // Has the socket been disconnected? - if (events[i].events & EPOLLHUP) { - if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) { - ERROR(ctx, "Could not remove socket file descriptor from epoll(): %s\n", - strerror(errno)); - goto ERROR; - } - - close(fd); - - stats->open_connections--; - } else { - // Close connections immediately when -x is set - if (ctx->close) { - DEBUG(ctx, "Closing connection %d\n", fd); - close(fd); - - stats->open_connections--; - continue; - } - - // Handle incoming data - if (events[i].events & EPOLLIN) { - r = handle_connection_recv(ctx, stats, fd); - if (r < 0) - goto ERROR; - } - - // Handle outgoing data - if (events[i].events & EPOLLOUT) { - r = handle_connection_send(ctx, stats, fd); - if (r) - goto ERROR; - } - } - } - } + return -ETIME; } - // All okay - r = 0; + return 0; +} -ERROR: - return r; +void fireperf_client_free(struct fireperf_ctx* ctx, void* data) { + struct fireperf_client_state* state = NULL; + + if (state->timerfd >= 0) + close(state->timerfd); + + free(state); } diff --git a/src/client.h b/src/client.h index 9ff4b76..f17bf89 100644 --- a/src/client.h +++ b/src/client.h @@ -22,9 +22,9 @@ #define FIREPERF_CLIENT_H #include "ctx.h" -#include "main.h" -int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats, - int epollfd, int timerfd); +int fireperf_client_init(struct fireperf_ctx* ctx, void** data, int epollfd); +int fireperf_client_handle(struct fireperf_ctx* ctx, void* data, int fd); +void fireperf_client_free(struct fireperf_ctx* ctx, void* data); #endif /* FIREPERF_CLIENT_H */ diff --git a/src/main.c b/src/main.c index 7e47afd..def8f97 100644 --- a/src/main.c +++ b/src/main.c @@ -96,15 +96,29 @@ static int set_limits(struct fireperf_ctx* ctx) { int main(int argc, char* argv[]) { struct fireperf_ctx* ctx = NULL; struct fireperf_stats stats = { 0 }; - int r; + uint64_t expirations = 0; + void* data = NULL; int epollfd = -1; int timerfd = -1; + int ready; + int r; // Create a new context r = fireperf_ctx_create(&ctx, argc, argv); if (r) return r; + switch (ctx->mode) { + case FIREPERF_MODE_CLIENT: + case FIREPERF_MODE_SERVER: + break; + + case FIREPERF_MODE_NONE: + fprintf(stderr, "No mode selected\n"); + r = 2; + goto ERROR; + } + // Set limits r = set_limits(ctx); if (r) @@ -125,20 +139,95 @@ int main(int argc, char* argv[]) { goto ERROR; } + // Initialize the client/server switch (ctx->mode) { case FIREPERF_MODE_CLIENT: - return fireperf_client(ctx, &stats, epollfd, timerfd); + r = fireperf_client_init(ctx, &data, epollfd); + if (r) + goto ERROR; + break; case FIREPERF_MODE_SERVER: - return fireperf_server(ctx, epollfd, timerfd); - - case FIREPERF_MODE_NONE: - fprintf(stderr, "No mode selected\n"); - r = 2; + r = fireperf_server_init(ctx, &data, epollfd); + if (r) + goto ERROR; break; + + default: + abort(); + } + + struct epoll_event events[EPOLL_MAX_EVENTS]; + + // Main loop + for (;;) { + ready = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1); + if (ready < 1) { + // We terminate gracefully when we receive a signal + if (errno == EINTR) + break; + + ERROR(ctx, "epoll_wait() failed: %m\n"); + goto ERROR; + } + + // Iterate over all file descriptors that have activity + for (int i = 0; i < ready; i++) { + int fd = events[i].data.fd; + + // Handle timer events + if (fd == timerfd) { + // Read from the timer to disarm it + ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations)); + if (bytes_read <= 0) { + ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno)); + goto ERROR; + } + + // Cumulate stats + stats = fireperf_ctx_get_stats(ctx); + + // Print the stats + r = fireperf_dump_stats(ctx, &stats, ctx->mode); + if (r) + goto ERROR; + + // Handle everything else + } else { + switch (ctx->mode) { + case FIREPERF_MODE_CLIENT: + r = fireperf_client_handle(ctx, data, fd); + if (r) + goto ERROR; + break; + + case FIREPERF_MODE_SERVER: + r = fireperf_server_handle(ctx, data, fd); + if (r) + goto ERROR; + break; + + default: + abort(); + } + } + } } ERROR: + switch (ctx->mode) { + case FIREPERF_MODE_CLIENT: + fireperf_client_free(ctx, data); + break; + + case FIREPERF_MODE_SERVER: + fireperf_server_free(ctx, data); + break; + + default: + abort(); + } + if (epollfd > 0) close(epollfd); if (timerfd > 0) diff --git a/src/server.c b/src/server.c index 8136961..0aab9b0 100644 --- a/src/server.c +++ b/src/server.c @@ -24,7 +24,6 @@ #include #include #include -#include #include #include "constants.h" @@ -32,17 +31,8 @@ #include "logging.h" #include "main.h" #include "server.h" -#include "util.h" #include "worker.h" -struct fireperf_server { - // ctxiguration - struct fireperf_ctx* ctx; - - // Count all connections - unsigned int connections; -}; - static int enable_keepalive(struct fireperf_ctx* ctx, int fd) { // Enable keepalive int flags = 1; @@ -93,17 +83,19 @@ static int enable_keepalive(struct fireperf_ctx* ctx, int fd) { return 0; } -static int create_socket(struct fireperf_ctx* ctx, int i) { +static int create_socket(struct fireperf_ctx* ctx, int epollfd, unsigned int i) { + int flags = 1; + int fd; int r; // Open a new socket - int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); + fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0); if (fd < 0) { ERROR(ctx, "Could not open socket: %s\n", strerror(errno)); goto ERROR; } - int flags = 1; + // Enable re-use port r = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &flags, sizeof(flags)); if (r) { ERROR(ctx, "Could not set SO_REUSEPORT on socket %d: %s\n", @@ -139,14 +131,30 @@ static int create_socket(struct fireperf_ctx* ctx, int i) { // Bind it to the selected port r = bind(fd, &addr, sizeof(addr)); if (r) { - ERROR(ctx, "Could not bind socket: %s\n", strerror(errno)); + ERROR(ctx, "Could not bind socket: %m\n"); + r = -errno; goto ERROR; } // Listen r = listen(fd, SOCKET_BACKLOG); if (r) { - ERROR(ctx, "Could not listen on socket: %s\n", strerror(errno)); + ERROR(ctx, "Could not listen on socket: %m\n"); + r = -errno; + goto ERROR; + } + + // Add listening socket to epoll + struct epoll_event ev = { + .events = EPOLLIN, + .data.fd = fd, + }; + + // Add the socket to the event loop + r = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); + if (r) { + ERROR(ctx, "Could not add socket file descriptor to epoll(): %m\n"); + r = -errno; goto ERROR; } @@ -157,159 +165,80 @@ static int create_socket(struct fireperf_ctx* ctx, int i) { ERROR: close(fd); - return -1; + return r; } -static int accept_connection(struct fireperf_server* server, int sockfd) { - struct fireperf_worker* worker = NULL; - struct sockaddr_in6 addr = {}; +int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd) { int r; - socklen_t l = sizeof(addr); + // Create listening sockets + for (unsigned int i = 0; i < ctx->listening_sockets; i++) { + r = create_socket(ctx, epollfd, i); + if (r < 0) + return r; + } + return 0; +} + +int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int sockfd) { + struct fireperf_worker* worker = NULL; + struct sockaddr_in6 addr = {}; int fd = -1; + int r = 0; + + socklen_t l = sizeof(addr); // The listening socket is ready, there is a new connection waiting to be accepted do { fd = accept(sockfd, &addr, &l); } while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); + // Fail if we could not accept the connection if (fd < 0) { - ERROR(server->ctx, "Could not accept a new connection: %s\n", strerror(errno)); - return -1; + ERROR(ctx, "Could not accept a new connection: %s\n", strerror(errno)); + return -errno; } - DEBUG(server->ctx, "New connection accepted on socket %d\n", fd); - - // A connection has been opened - server->connections++; + DEBUG(ctx, "New connection accepted on socket %d\n", fd); // Close connections immediately when -x is set - if (server->ctx->close) { - r = 0; - + if (ctx->close) goto ERROR; - } // Find a worker to delegate this connection to - worker = fireperf_ctx_fetch_worker(server->ctx); + worker = fireperf_ctx_fetch_worker(ctx); // Close the connection if we could not find a worker if (!worker) { - ERROR(server->ctx, "Could not find a worker that could handle a new connection\n"); + ERROR(ctx, "Could not find a worker that could handle a new connection\n"); r = -EBUSY; goto ERROR; } + // Read any data + int events = EPOLLIN; + + // Send data unless we are in keepalive-only mode + if (!ctx->keepalive_only) + events |= EPOLLOUT; + // Delegate the connection - r = fireperf_worker_delegate(worker, fd); + r = fireperf_worker_delegate(worker, events, fd); if (r < 0) { - ERROR(server->ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r)); + ERROR(ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r)); goto ERROR; } return 0; ERROR: - if (fd >= 0) { - DEBUG(server->ctx, "Closing connection %d\n", fd); - - close(fd); - } + DEBUG(ctx, "Closing connection %d\n", fd); + close(fd); return r; } -static int is_listening_socket(struct fireperf_ctx* ctx, int* sockets, int fd) { - for (unsigned int i = 0; i < ctx->listening_sockets; i++) { - if (sockets[i] == fd) - return 1; - } - - return 0; -} - -int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd) { - struct fireperf_server server = { - .ctx = ctx, - }; - struct fireperf_stats stats = {}; - uint64_t expirations = 0; - int r; - - DEBUG(ctx, "Launching " PACKAGE_NAME " in server mode\n"); - - int listening_sockets[ctx->listening_sockets]; - - struct epoll_event ev; - struct epoll_event events[EPOLL_MAX_EVENTS]; - - // Create listening sockets - for (unsigned int i = 0; i < ctx->listening_sockets; i++) { - int sockfd = create_socket(ctx, i); - if (sockfd < 0) - return 1; - - listening_sockets[i] = sockfd; - - // Add listening socket to epoll - ev.events = EPOLLIN; - ev.data.fd = sockfd; - - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev)) { - ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n", - strerror(errno)); - goto ERROR; - } - } - - DEBUG(ctx, "Entering main loop...\n"); - - while (!ctx->terminated) { - int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1); - if (fds < 1) { - // We terminate gracefully when we receive a signal - if (errno == EINTR) - break; - - ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno)); - goto ERROR; - } - - for (int i = 0; i < fds; i++) { - int fd = events[i].data.fd; - - // The listening socket - if (is_listening_socket(ctx, listening_sockets, fd)) { - r = accept_connection(&server, fd); - if (r < 0) - goto ERROR; - - // Handle timer events - } else if (fd == timerfd) { - // Read from the timer to disarm it - ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations)); - if (bytes_read <= 0) { - ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno)); - goto ERROR; - } - - // Cumulate stats - stats = fireperf_ctx_get_stats(ctx); - - // Print the stats - r = fireperf_dump_stats(ctx, &stats, FIREPERF_MODE_SERVER); - if (r) - goto ERROR; - } - } - } - -ERROR: - for (unsigned int i = 0; i < ctx->listening_sockets; i++) { - if (listening_sockets[i] > 0) - close(listening_sockets[i]); - } - - return r; +void fireperf_server_free(struct fireperf_ctx*, void* data) { + return; } diff --git a/src/server.h b/src/server.h index fb5f095..3a34a57 100644 --- a/src/server.h +++ b/src/server.h @@ -23,6 +23,8 @@ #include "ctx.h" -int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd); +int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd); +int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int fd); +void fireperf_server_free(struct fireperf_ctx* ctx, void* data); #endif /* FIREPERF_SERVER_H */ diff --git a/src/worker.c b/src/worker.c index d2d93f8..205b6f5 100644 --- a/src/worker.c +++ b/src/worker.c @@ -188,17 +188,17 @@ int fireperf_worker_is_alive(struct fireperf_worker* worker) { return pthread_tryjoin_np(worker->thread, NULL); } -int fireperf_worker_delegate(struct fireperf_worker* worker, int socket) { +int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket) { int r; + // Always listen to the connection closing + events |= EPOLLRDHUP; + struct epoll_event ev = { - .events = EPOLLIN|EPOLLRDHUP, + .events = events, .data.fd = socket, }; - if (!worker->ctx->keepalive_only) - ev.events |= EPOLLOUT; - // Add the socket to the loop r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev); if (r) diff --git a/src/worker.h b/src/worker.h index 7059df5..a4a34e7 100644 --- a/src/worker.h +++ b/src/worker.h @@ -32,7 +32,7 @@ int fireperf_worker_launch(struct fireperf_worker* worker); int fireperf_worker_terminate(struct fireperf_worker* worker); int fireperf_worker_is_alive(struct fireperf_worker* worker); -int fireperf_worker_delegate(struct fireperf_worker* worker, int socket); +int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket); const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker);