]> git.ipfire.org Git - fireperf.git/commitdiff
worker: Move sending/receiving data into the worker
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 10:42:28 +0000 (10:42 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 19 Sep 2024 10:42:28 +0000 (10:42 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/ctx.c
src/main.c
src/main.h
src/worker.c

index 32a3add983972b4af141f377886e1d71298ab3fd..5bfe813dbb54309054d343844776174803c1fbdd 100644 (file)
--- a/src/ctx.c
+++ b/src/ctx.c
@@ -32,6 +32,7 @@
 #include "logging.h"
 #include "main.h"
 #include "random.h"
+#include "stats.h"
 
 static int parse_address(const char* string, struct in6_addr* address6) {
        struct in_addr address4;
index def8f9747ab21922338daa2851229324d1be835d..a8ba5707a569186aa84faac7b533eaa3bc1bcccc 100644 (file)
@@ -31,7 +31,6 @@
 #include "client.h"
 #include "main.h"
 #include "logging.h"
-#include "random.h"
 #include "server.h"
 #include "stats.h"
 
@@ -261,50 +260,3 @@ int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd) {
 
        return 0;
 }
-
-static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
-
-int handle_connection_send(struct fireperf_ctx* ctx,
-               struct fireperf_stats* stats, int fd) {
-       const char* buffer = ZERO;
-       ssize_t bytes_sent;
-
-       if (!ctx->zero) {
-               buffer = fireperf_random_pool_get_slice(ctx, SOCKET_SEND_BUFFER_SIZE);
-       }
-
-       do {
-               bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY);
-       } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
-
-       // Update statistics
-       stats->bytes_sent += bytes_sent;
-       stats->total_bytes_sent += bytes_sent;
-
-       return 0;
-}
-
-int handle_connection_recv(struct fireperf_ctx* ctx,
-               struct fireperf_stats* stats, int fd) {
-       char buffer[SOCKET_RECV_BUFFER_SIZE];
-       ssize_t bytes_read;
-
-       // Try reading into buffer
-       do {
-               bytes_read = recv(fd, buffer, sizeof(buffer), 0);
-       } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
-
-       // Error?
-       if (bytes_read < 0) {
-               ERROR(ctx, "Could not read from socket %d: %s\n", fd, strerror(errno));
-               return -1;
-       }
-
-       DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
-
-       // Update statistics
-       stats->bytes_received += bytes_read;
-       stats->total_bytes_received += bytes_read;
-
-       return 0;
-}
index f469144102f1764b82c51c0fe4e9732504513b54..32f5fb7d6e91bffdc162464417ab3839cab6b59d 100644 (file)
 #define FIREPERF_MAIN_H
 
 #include "ctx.h"
-#include "stats.h"
 
 int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd);
 
-int handle_connection_send(struct fireperf_ctx* ctx,
-               struct fireperf_stats* stats, int fd);
-int handle_connection_recv(struct fireperf_ctx* ctx,
-               struct fireperf_stats* stats, int fd);
-
 #endif /* FIREPERF_MAIN_H */
index 205b6f5337df09e9754b333dc6e74dfea5de6900..75f64b2b94c034eb1fbee4aa70601dfc8a6f8373 100644 (file)
 #include "ctx.h"
 #include "main.h"
 #include "logging.h"
+#include "random.h"
+#include "stats.h"
 #include "worker.h"
 
+static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
+
 struct fireperf_worker {
        pthread_t thread;
 
@@ -87,6 +91,50 @@ void fireperf_worker_free(struct fireperf_worker* worker) {
        free(worker);
 }
 
+static int fireperf_worker_send(struct fireperf_worker* worker, int fd) {
+       const char* buffer = ZERO;
+       ssize_t bytes_sent;
+
+       // Fetch some random data if requested
+       if (!worker->ctx->zero)
+               buffer = fireperf_random_pool_get_slice(worker->ctx, SOCKET_SEND_BUFFER_SIZE);
+
+       // Send the buffer
+       do {
+               bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY);
+       } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+
+       // Update statistics
+       worker->stats.bytes_sent       += bytes_sent;
+       worker->stats.total_bytes_sent += bytes_sent;
+
+       return 0;
+}
+
+static int fireperf_worker_recv(struct fireperf_worker* worker, int fd) {
+       char buffer[SOCKET_RECV_BUFFER_SIZE];
+       ssize_t bytes_read;
+
+       // Try reading into buffer
+       do {
+               bytes_read = recv(fd, buffer, sizeof(buffer), 0);
+       } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+
+       // Error?
+       if (bytes_read < 0) {
+               ERROR(worker->ctx, "Could not read from socket %d: %m\n", fd);
+               return -errno;
+       }
+
+       //DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
+
+       // Update statistics
+       worker->stats.bytes_received       += bytes_read;
+       worker->stats.total_bytes_received += bytes_read;
+
+       return 0;
+}
+
 /*
        The main function of the worker.
 */
@@ -140,14 +188,14 @@ static void* fireperf_worker_main(void* w) {
 
                        // Handle incoming data
                        if (events[i].events & EPOLLIN) {
-                               r = handle_connection_recv(worker->ctx, &worker->stats, fd);
+                               r = fireperf_worker_recv(worker, fd);
                                if (r < 0)
                                        goto ERROR;
                        }
 
                        // Handle outgoing data
                        if (events[i].events & EPOLLOUT) {
-                               r = handle_connection_send(worker->ctx, &worker->stats, fd);
+                               r = fireperf_worker_send(worker, fd);
                                if (r < 0)
                                        goto ERROR;
                        }