]> git.ipfire.org Git - fireperf.git/commitdiff
main: Merge client and server to use the workers
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 10:35:02 +0000 (10:35 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 10:35:02 +0000 (10:35 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/client.c
src/client.h
src/main.c
src/server.c
src/server.h
src/worker.c
src/worker.h

index 35f9c92f6cac9b9a025d1c0a02a65a9adc3915f6..e5618b15fc5be2aef10a00439102bf2060e07ad0 100644 (file)
@@ -23,6 +23,7 @@
 #include <signal.h>
 #include <string.h>
 #include <sys/epoll.h>
+#include <sys/timerfd.h>
 #include <unistd.h>
 
 #include "client.h"
 #include "random.h"
 #include "util.h"
 
-// Set to one when the timeout has expired
-static int timeout_expired = 0;
+struct fireperf_client_state {
+       // For the timeout
+       int timerfd;
 
-static void handle_SIGALRM(int signal) {
-       switch (signal) {
-               // Terminate after timeout has expired
-               case SIGALRM:
-                       timeout_expired = 1;
-                       break;
-       }
+       // How many open connections?
+       unsigned int connections;
+};
+
+static int setup_timer(struct fireperf_ctx* ctx, int epollfd) {
+       int timerfd = -1;
+       int r;
+
+       // Create timerfd()
+       timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
+       if (timerfd < 0)
+               return -errno;
+
+       struct epoll_event ev = {
+               .events  = EPOLLIN,
+               .data.fd = timerfd,
+       };
+
+       // Register the timer with the event loop
+       r = epoll_ctl(epollfd, EPOLL_CTL_ADD, timerfd, &ev);
+       if (r)
+               return -errno;
+
+       // Let the timer ping us once a second
+       struct itimerspec timer = {
+               .it_value.tv_sec = ctx->timeout,
+       };
+
+       // Arm the timer
+       r = timerfd_settime(timerfd, 0, &timer, NULL);
+       if (r)
+               return -errno;
+
+       return timerfd;
 }
 
-static int open_connection(struct fireperf_ctx* ctx) {
+static int open_connection(struct fireperf_ctx* ctx,
+               struct fireperf_client_state* state, int epollfd) {
+       struct fireperf_worker* worker = NULL;
+       int fd;
+       int r;
+
        // Open a new socket
-       int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
+       fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
        if (fd < 0) {
-               ERROR(ctx, "Could not open socket: %s\n", strerror(errno));
+               ERROR(ctx, "Could not open socket: %m\n");
+               r = -errno;
                goto ERROR;
        }
 
@@ -66,150 +101,130 @@ static int open_connection(struct fireperf_ctx* ctx) {
        };
 
        // Set socket buffer sizes
-       int r = set_socket_buffer_sizes(ctx, fd);
+       r = set_socket_buffer_sizes(ctx, fd);
        if (r)
                goto ERROR;
 
        // Connect to the server
        r = connect(fd, &peer, sizeof(peer));
-       if (r && (errno != EINPROGRESS)) {
-               ERROR(ctx, "Could not connect to server: %s\n", strerror(errno));
+       if (r) {
+               switch (errno) {
+                       case EINPROGRESS:
+                               break;
+
+                       default:
+                               ERROR(ctx, "Could not connect to server: %m\n");
+                               r = -errno;
+                               goto ERROR;
+               }
+       }
+
+       // Find a worker to delegate this connection to
+       worker = fireperf_ctx_fetch_worker(ctx);
+
+       // Close the connection if we could not find a worker
+       if (!worker) {
+               ERROR(ctx, "Could not find a worker that could handle a new connection\n");
+               r = -EBUSY;
                goto ERROR;
        }
 
-       return fd;
+       int events = EPOLLIN;
+
+       // Let us know when the socket is ready for receiving data
+       if (!ctx->keepalive_only)
+               events |= EPOLLIN;
+
+       // In duplex mode, we send data, too
+       if (ctx->duplex)
+               events |= EPOLLOUT;
+
+       // Delegate the connection
+       r = fireperf_worker_delegate(worker, events, fd);
+       if (r < 0) {
+               ERROR(ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
+               goto ERROR;
+       }
+
+       // Increment the number of open connections
+       state->connections++;
+
+       return 0;
 
 ERROR:
-       if (fd > 0)
+       if (fd >= 0)
                close(fd);
 
-       return -1;
+       return r;
 }
 
-int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats,
-               int epollfd, int timerfd) {
-       DEBUG(ctx, "Launching " PACKAGE_NAME " in client mode\n");
-
-       int r = 1;
+static int open_connections(struct fireperf_ctx* ctx,
+               struct fireperf_client_state* state, int epollfd) {
+       int r = 0;
 
-       struct epoll_event ev = {
-               .events = EPOLLIN,
-       };
-       struct epoll_event events[EPOLL_MAX_EVENTS];
+       // If we don't have enough open connections, we try to open more
+       while (state->connections < ctx->parallel) {
+               r = open_connection(ctx, state, epollfd);
+               if (r)
+                       break;
+       }
 
-       // Let us know when the socket is ready for receiving data
-       if (!ctx->keepalive_only)
-               ev.events |= EPOLLIN;
+       return r;
+}
 
-       // In duplex mode, we send data, too
-       if (ctx->duplex)
-               ev.events |= EPOLLOUT;
+int fireperf_client_init(struct fireperf_ctx* ctx, void** data, int epollfd) {
+       struct fireperf_client_state* state = NULL;
+       int r;
 
-       DEBUG(ctx, "Opening %lu connections...\n", ctx->parallel);
+       // Allocate state
+       state = calloc(1, sizeof(*state));
+       if (!state)
+               return -errno;
 
-       // ctxigure timeout if set
+       // Setup a timer for the timeout
        if (ctx->timeout) {
-               // Register signal handler
-               signal(SIGALRM, handle_SIGALRM);
-
-               alarm(ctx->timeout);
+               state->timerfd = setup_timer(ctx, epollfd);
+               if (state->timerfd < 0)
+                       return -state->timerfd;
+       } else {
+               state->timerfd = -1;
        }
 
-       DEBUG(ctx, "Entering main loop...\n");
+       // Open some initial connections
+       r = open_connections(ctx, state, epollfd);
+       if (r)
+               goto ERROR;
 
-       while (!ctx->terminated && !timeout_expired) {
-               // Open connections
-               while (stats->open_connections < ctx->parallel) {
-                       int fd = open_connection(ctx);
-                       if (fd < 0)
-                               continue;
+       // Return the state
+       *data = state;
 
-                       ev.data.fd = fd;
+       return 0;
 
-                       if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) {
-                               ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n",
-                                       strerror(errno));
-                               goto ERROR;
-                       }
+ERROR:
+       if (state)
+               fireperf_client_free(ctx, state);
 
-                       stats->open_connections++;
-                       stats->connections++;
-               }
+       return r;
+}
 
-               int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
-               if (fds < 1) {
-                       // We terminate gracefully when we receive a signal
-                       if (errno == EINTR)
-                               break;
+int fireperf_client_handle(struct fireperf_ctx* ctx, void* data, int fd) {
+       struct fireperf_client_state* state = data;
 
-                       ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno));
-                       goto ERROR;
-               }
+       // Check if the timeout has fired
+       if (state->timerfd == fd) {
+               DEBUG(ctx, "Timeout reached\n");
 
-               for (int i = 0; i < fds; i++) {
-                       int fd = events[i].data.fd;
-
-                       // What type of event are we handling?
-
-                       // Handle timer events
-                       if (fd == timerfd) {
-                               uint64_t expirations;
-
-                               // Read from the timer to disarm it
-                               ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
-                               if (bytes_read <= 0) {
-                                       ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
-                                       goto ERROR;
-                               }
-
-                               r = fireperf_dump_stats(ctx, stats, FIREPERF_MODE_CLIENT);
-                               if (r)
-                                       goto ERROR;
-
-                       // Handle connection sockets
-                       } else {
-                               // Has the socket been disconnected?
-                               if (events[i].events & EPOLLHUP) {
-                                       if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) {
-                                               ERROR(ctx, "Could not remove socket file descriptor from epoll(): %s\n",
-                                                       strerror(errno));
-                                               goto ERROR;
-                                       }
-
-                                       close(fd);
-
-                                       stats->open_connections--;
-                               } else {
-                                       // Close connections immediately when -x is set
-                                       if (ctx->close) {
-                                               DEBUG(ctx, "Closing connection %d\n", fd);
-                                               close(fd);
-
-                                               stats->open_connections--;
-                                               continue;
-                                       }
-
-                                       // Handle incoming data
-                                       if (events[i].events & EPOLLIN) {
-                                               r = handle_connection_recv(ctx, stats, fd);
-                                               if (r < 0)
-                                                       goto ERROR;
-                                       }
-
-                                       // Handle outgoing data
-                                       if (events[i].events & EPOLLOUT) {
-                                               r = handle_connection_send(ctx, stats, fd);
-                                               if (r)
-                                                       goto ERROR;
-                                       }
-                               }
-                       }
-               }
+               return -ETIME;
        }
 
-       // All okay
-       r = 0;
+       return 0;
+}
 
-ERROR:
-       return r;
+void fireperf_client_free(struct fireperf_ctx* ctx, void* data) {
+       struct fireperf_client_state* state = NULL;
+
+       if (state->timerfd >= 0)
+               close(state->timerfd);
+
+       free(state);
 }
index 9ff4b76b457566a1d89e5dde1104b83ba4fdad99..f17bf89b888729bc1eb6cb57241308e9135ad44d 100644 (file)
@@ -22,9 +22,9 @@
 #define FIREPERF_CLIENT_H
 
 #include "ctx.h"
-#include "main.h"
 
-int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats,
-       int epollfd, int timerfd);
+int fireperf_client_init(struct fireperf_ctx* ctx, void** data, int epollfd);
+int fireperf_client_handle(struct fireperf_ctx* ctx, void* data, int fd);
+void fireperf_client_free(struct fireperf_ctx* ctx, void* data);
 
 #endif /* FIREPERF_CLIENT_H */
index 7e47afda1170d386afeb551d9a2c9707e2dae3dd..def8f9747ab21922338daa2851229324d1be835d 100644 (file)
@@ -96,15 +96,29 @@ static int set_limits(struct fireperf_ctx* ctx) {
 int main(int argc, char* argv[]) {
        struct fireperf_ctx* ctx = NULL;
        struct fireperf_stats stats = { 0 };
-       int r;
+       uint64_t expirations = 0;
+       void* data = NULL;
        int epollfd = -1;
        int timerfd = -1;
+       int ready;
+       int r;
 
        // Create a new context
        r = fireperf_ctx_create(&ctx, argc, argv);
        if (r)
                return r;
 
+       switch (ctx->mode) {
+               case FIREPERF_MODE_CLIENT:
+               case FIREPERF_MODE_SERVER:
+                       break;
+
+               case FIREPERF_MODE_NONE:
+                       fprintf(stderr, "No mode selected\n");
+                       r = 2;
+                       goto ERROR;
+       }
+
        // Set limits
        r = set_limits(ctx);
        if (r)
@@ -125,20 +139,95 @@ int main(int argc, char* argv[]) {
                goto ERROR;
        }
 
+       // Initialize the client/server
        switch (ctx->mode) {
                case FIREPERF_MODE_CLIENT:
-                       return fireperf_client(ctx, &stats, epollfd, timerfd);
+                       r = fireperf_client_init(ctx, &data, epollfd);
+                       if (r)
+                               goto ERROR;
+                       break;
 
                case FIREPERF_MODE_SERVER:
-                       return fireperf_server(ctx, epollfd, timerfd);
-
-               case FIREPERF_MODE_NONE:
-                       fprintf(stderr, "No mode selected\n");
-                       r = 2;
+                       r = fireperf_server_init(ctx, &data, epollfd);
+                       if (r)
+                               goto ERROR;
                        break;
+
+               default:
+                       abort();
+       }
+
+       struct epoll_event events[EPOLL_MAX_EVENTS];
+
+       // Main loop
+       for (;;) {
+               ready = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
+               if (ready < 1) {
+                       // We terminate gracefully when we receive a signal
+                       if (errno == EINTR)
+                               break;
+
+                       ERROR(ctx, "epoll_wait() failed: %m\n");
+                       goto ERROR;
+               }
+
+               // Iterate over all file descriptors that have activity
+               for (int i = 0; i < ready; i++) {
+                       int fd = events[i].data.fd;
+
+                       // Handle timer events
+                       if (fd == timerfd) {
+                               // Read from the timer to disarm it
+                               ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
+                               if (bytes_read <= 0) {
+                                       ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
+                                       goto ERROR;
+                               }
+
+                               // Cumulate stats
+                               stats = fireperf_ctx_get_stats(ctx);
+
+                               // Print the stats
+                               r = fireperf_dump_stats(ctx, &stats, ctx->mode);
+                               if (r)
+                                       goto ERROR;
+
+                       // Handle everything else
+                       } else {
+                               switch (ctx->mode) {
+                                       case FIREPERF_MODE_CLIENT:
+                                               r = fireperf_client_handle(ctx, data, fd);
+                                               if (r)
+                                                       goto ERROR;
+                                               break;
+
+                                       case FIREPERF_MODE_SERVER:
+                                               r = fireperf_server_handle(ctx, data, fd);
+                                               if (r)
+                                                       goto ERROR;
+                                               break;
+
+                                       default:
+                                               abort();
+                               }
+                       }
+               }
        }
 
 ERROR:
+       switch (ctx->mode) {
+               case FIREPERF_MODE_CLIENT:
+                       fireperf_client_free(ctx, data);
+                       break;
+
+               case FIREPERF_MODE_SERVER:
+                       fireperf_server_free(ctx, data);
+                       break;
+
+               default:
+                       abort();
+       }
+
        if (epollfd > 0)
                close(epollfd);
        if (timerfd > 0)
index 8136961d061fdd8331368027b0a8a0c03e254297..0aab9b002dbf6e38af6960bdeefe8737211557e4 100644 (file)
@@ -24,7 +24,6 @@
 #include <stdio.h>
 #include <string.h>
 #include <sys/epoll.h>
-#include <time.h>
 #include <unistd.h>
 
 #include "constants.h"
 #include "logging.h"
 #include "main.h"
 #include "server.h"
-#include "util.h"
 #include "worker.h"
 
-struct fireperf_server {
-       // ctxiguration
-       struct fireperf_ctx* ctx;
-
-       // Count all connections
-       unsigned int connections;
-};
-
 static int enable_keepalive(struct fireperf_ctx* ctx, int fd) {
        // Enable keepalive
        int flags = 1;
@@ -93,17 +83,19 @@ static int enable_keepalive(struct fireperf_ctx* ctx, int fd) {
        return 0;
 }
 
-static int create_socket(struct fireperf_ctx* ctx, int i) {
+static int create_socket(struct fireperf_ctx* ctx, int epollfd, unsigned int i) {
+       int flags = 1;
+       int fd;
        int r;
 
        // Open a new socket
-       int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
+       fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
        if (fd < 0) {
                ERROR(ctx, "Could not open socket: %s\n", strerror(errno));
                goto ERROR;
        }
 
-       int flags = 1;
+       // Enable re-use port
        r = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &flags, sizeof(flags));
        if (r) {
                ERROR(ctx, "Could not set SO_REUSEPORT on socket %d: %s\n",
@@ -139,14 +131,30 @@ static int create_socket(struct fireperf_ctx* ctx, int i) {
        // Bind it to the selected port
        r = bind(fd, &addr, sizeof(addr));
        if (r) {
-               ERROR(ctx, "Could not bind socket: %s\n", strerror(errno));
+               ERROR(ctx, "Could not bind socket: %m\n");
+               r = -errno;
                goto ERROR;
        }
 
        // Listen
        r = listen(fd, SOCKET_BACKLOG);
        if (r) {
-               ERROR(ctx, "Could not listen on socket: %s\n", strerror(errno));
+               ERROR(ctx, "Could not listen on socket: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Add listening socket to epoll
+       struct epoll_event ev = {
+               .events  = EPOLLIN,
+               .data.fd = fd,
+       };
+
+       // Add the socket to the event loop
+       r = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
+       if (r) {
+               ERROR(ctx, "Could not add socket file descriptor to epoll(): %m\n");
+               r = -errno;
                goto ERROR;
        }
 
@@ -157,159 +165,80 @@ static int create_socket(struct fireperf_ctx* ctx, int i) {
 ERROR:
        close(fd);
 
-       return -1;
+       return r;
 }
 
-static int accept_connection(struct fireperf_server* server, int sockfd) {
-       struct fireperf_worker* worker = NULL;
-       struct sockaddr_in6 addr = {};
+int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd) {
        int r;
 
-       socklen_t l = sizeof(addr);
+       // Create listening sockets
+       for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
+               r = create_socket(ctx, epollfd, i);
+               if (r < 0)
+                       return r;
+       }
 
+       return 0;
+}
+
+int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int sockfd) {
+       struct fireperf_worker* worker = NULL;
+       struct sockaddr_in6 addr = {};
        int fd = -1;
+       int r = 0;
+
+       socklen_t l = sizeof(addr);
 
        // The listening socket is ready, there is a new connection waiting to be accepted
        do {
                fd = accept(sockfd, &addr, &l);
        } while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
 
+       // Fail if we could not accept the connection
        if (fd < 0) {
-               ERROR(server->ctx, "Could not accept a new connection: %s\n", strerror(errno));
-               return -1;
+               ERROR(ctx, "Could not accept a new connection: %s\n", strerror(errno));
+               return -errno;
        }
 
-       DEBUG(server->ctx, "New connection accepted on socket %d\n", fd);
-
-       // A connection has been opened
-       server->connections++;
+       DEBUG(ctx, "New connection accepted on socket %d\n", fd);
 
        // Close connections immediately when -x is set
-       if (server->ctx->close) {
-               r = 0;
-
+       if (ctx->close)
                goto ERROR;
-       }
 
        // Find a worker to delegate this connection to
-       worker = fireperf_ctx_fetch_worker(server->ctx);
+       worker = fireperf_ctx_fetch_worker(ctx);
 
        // Close the connection if we could not find a worker
        if (!worker) {
-               ERROR(server->ctx, "Could not find a worker that could handle a new connection\n");
+               ERROR(ctx, "Could not find a worker that could handle a new connection\n");
                r = -EBUSY;
                goto ERROR;
        }
 
+       // Read any data
+       int events = EPOLLIN;
+
+       // Send data unless we are in keepalive-only mode
+       if (!ctx->keepalive_only)
+               events |= EPOLLOUT;
+
        // Delegate the connection
-       r = fireperf_worker_delegate(worker, fd);
+       r = fireperf_worker_delegate(worker, events, fd);
        if (r < 0) {
-               ERROR(server->ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
+               ERROR(ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
                goto ERROR;
        }
 
        return 0;
 
 ERROR:
-       if (fd >= 0) {
-               DEBUG(server->ctx, "Closing connection %d\n", fd);
-
-               close(fd);
-       }
+       DEBUG(ctx, "Closing connection %d\n", fd);
+       close(fd);
 
        return r;
 }
 
-static int is_listening_socket(struct fireperf_ctx* ctx, int* sockets, int fd) {
-       for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
-               if (sockets[i] == fd)
-                       return 1;
-       }
-
-       return 0;
-}
-
-int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd) {
-       struct fireperf_server server = {
-               .ctx = ctx,
-       };
-       struct fireperf_stats stats = {};
-       uint64_t expirations = 0;
-       int r;
-
-       DEBUG(ctx, "Launching " PACKAGE_NAME " in server mode\n");
-
-       int listening_sockets[ctx->listening_sockets];
-
-       struct epoll_event ev;
-       struct epoll_event events[EPOLL_MAX_EVENTS];
-
-       // Create listening sockets
-       for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
-               int sockfd = create_socket(ctx, i);
-               if (sockfd < 0)
-                       return 1;
-
-               listening_sockets[i] = sockfd;
-
-               // Add listening socket to epoll
-               ev.events  = EPOLLIN;
-               ev.data.fd = sockfd;
-
-               if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev)) {
-                       ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n",
-                               strerror(errno));
-                       goto ERROR;
-               }
-       }
-
-       DEBUG(ctx, "Entering main loop...\n");
-
-       while (!ctx->terminated) {
-               int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
-               if (fds < 1) {
-                       // We terminate gracefully when we receive a signal
-                       if (errno == EINTR)
-                               break;
-
-                       ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno));
-                       goto ERROR;
-               }
-
-               for (int i = 0; i < fds; i++) {
-                       int fd = events[i].data.fd;
-
-                       // The listening socket
-                       if (is_listening_socket(ctx, listening_sockets, fd)) {
-                               r = accept_connection(&server, fd);
-                               if (r < 0)
-                                       goto ERROR;
-
-                       // Handle timer events
-                       } else if (fd == timerfd) {
-                               // Read from the timer to disarm it
-                               ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
-                               if (bytes_read <= 0) {
-                                       ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
-                                       goto ERROR;
-                               }
-
-                               // Cumulate stats
-                               stats = fireperf_ctx_get_stats(ctx);
-
-                               // Print the stats
-                               r = fireperf_dump_stats(ctx, &stats, FIREPERF_MODE_SERVER);
-                               if (r)
-                                       goto ERROR;
-                       }
-               }
-       }
-
-ERROR:
-       for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
-               if (listening_sockets[i] > 0)
-                       close(listening_sockets[i]);
-       }
-
-       return r;
+void fireperf_server_free(struct fireperf_ctx*, void* data) {
+       return;
 }
index fb5f095012608de17ace13465adf8b9ca88c7598..3a34a5705d312f7ed11c27cc5cbc3b41d6b82396 100644 (file)
@@ -23,6 +23,8 @@
 
 #include "ctx.h"
 
-int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd);
+int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd);
+int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int fd);
+void fireperf_server_free(struct fireperf_ctx* ctx, void* data);
 
 #endif /* FIREPERF_SERVER_H */
index d2d93f8bfd669b66cbff9376cb1a8a7914304e58..205b6f5337df09e9754b333dc6e74dfea5de6900 100644 (file)
@@ -188,17 +188,17 @@ int fireperf_worker_is_alive(struct fireperf_worker* worker) {
        return pthread_tryjoin_np(worker->thread, NULL);
 }
 
-int fireperf_worker_delegate(struct fireperf_worker* worker, int socket) {
+int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket) {
        int r;
 
+       // Always listen to the connection closing
+       events |= EPOLLRDHUP;
+
        struct epoll_event ev = {
-               .events  = EPOLLIN|EPOLLRDHUP,
+               .events  = events,
                .data.fd = socket,
        };
 
-       if (!worker->ctx->keepalive_only)
-               ev.events |= EPOLLOUT;
-
        // Add the socket to the loop
        r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
        if (r)
index 7059df5c905af508efa2c3240b948075e6e69ee5..a4a34e7338e42fff22d8c5758ba1ff74a8b71dda 100644 (file)
@@ -32,7 +32,7 @@ int fireperf_worker_launch(struct fireperf_worker* worker);
 int fireperf_worker_terminate(struct fireperf_worker* worker);
 int fireperf_worker_is_alive(struct fireperf_worker* worker);
 
-int fireperf_worker_delegate(struct fireperf_worker* worker, int socket);
+int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket);
 
 const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker);