From b7b2e5e0e506f1eb0c79140b9cc9ab04bb530f8d Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Thu, 19 Sep 2024 10:42:28 +0000 Subject: [PATCH] worker: Move sending/receiving data into the worker Signed-off-by: Michael Tremer --- src/ctx.c | 1 + src/main.c | 48 ------------------------------------------------ src/main.h | 6 ------ src/worker.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 51 insertions(+), 56 deletions(-) diff --git a/src/ctx.c b/src/ctx.c index 32a3add..5bfe813 100644 --- a/src/ctx.c +++ b/src/ctx.c @@ -32,6 +32,7 @@ #include "logging.h" #include "main.h" #include "random.h" +#include "stats.h" static int parse_address(const char* string, struct in6_addr* address6) { struct in_addr address4; diff --git a/src/main.c b/src/main.c index def8f97..a8ba570 100644 --- a/src/main.c +++ b/src/main.c @@ -31,7 +31,6 @@ #include "client.h" #include "main.h" #include "logging.h" -#include "random.h" #include "server.h" #include "stats.h" @@ -261,50 +260,3 @@ int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd) { return 0; } - -static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 }; - -int handle_connection_send(struct fireperf_ctx* ctx, - struct fireperf_stats* stats, int fd) { - const char* buffer = ZERO; - ssize_t bytes_sent; - - if (!ctx->zero) { - buffer = fireperf_random_pool_get_slice(ctx, SOCKET_SEND_BUFFER_SIZE); - } - - do { - bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY); - } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); - - // Update statistics - stats->bytes_sent += bytes_sent; - stats->total_bytes_sent += bytes_sent; - - return 0; -} - -int handle_connection_recv(struct fireperf_ctx* ctx, - struct fireperf_stats* stats, int fd) { - char buffer[SOCKET_RECV_BUFFER_SIZE]; - ssize_t bytes_read; - - // Try reading into buffer - do { - bytes_read = recv(fd, buffer, sizeof(buffer), 0); - } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); - - // Error? - if (bytes_read < 0) { - ERROR(ctx, "Could not read from socket %d: %s\n", fd, strerror(errno)); - return -1; - } - - DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd); - - // Update statistics - stats->bytes_received += bytes_read; - stats->total_bytes_received += bytes_read; - - return 0; -} diff --git a/src/main.h b/src/main.h index f469144..32f5fb7 100644 --- a/src/main.h +++ b/src/main.h @@ -22,13 +22,7 @@ #define FIREPERF_MAIN_H #include "ctx.h" -#include "stats.h" int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd); -int handle_connection_send(struct fireperf_ctx* ctx, - struct fireperf_stats* stats, int fd); -int handle_connection_recv(struct fireperf_ctx* ctx, - struct fireperf_stats* stats, int fd); - #endif /* FIREPERF_MAIN_H */ diff --git a/src/worker.c b/src/worker.c index 205b6f5..75f64b2 100644 --- a/src/worker.c +++ b/src/worker.c @@ -28,8 +28,12 @@ #include "ctx.h" #include "main.h" #include "logging.h" +#include "random.h" +#include "stats.h" #include "worker.h" +static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 }; + struct fireperf_worker { pthread_t thread; @@ -87,6 +91,50 @@ void fireperf_worker_free(struct fireperf_worker* worker) { free(worker); } +static int fireperf_worker_send(struct fireperf_worker* worker, int fd) { + const char* buffer = ZERO; + ssize_t bytes_sent; + + // Fetch some random data if requested + if (!worker->ctx->zero) + buffer = fireperf_random_pool_get_slice(worker->ctx, SOCKET_SEND_BUFFER_SIZE); + + // Send the buffer + do { + bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY); + } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); + + // Update statistics + worker->stats.bytes_sent += bytes_sent; + worker->stats.total_bytes_sent += bytes_sent; + + return 0; +} + +static int fireperf_worker_recv(struct fireperf_worker* worker, int fd) { + char buffer[SOCKET_RECV_BUFFER_SIZE]; + ssize_t bytes_read; + + // Try reading into buffer + do { + bytes_read = recv(fd, buffer, sizeof(buffer), 0); + } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); + + // Error? + if (bytes_read < 0) { + ERROR(worker->ctx, "Could not read from socket %d: %m\n", fd); + return -errno; + } + + //DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd); + + // Update statistics + worker->stats.bytes_received += bytes_read; + worker->stats.total_bytes_received += bytes_read; + + return 0; +} + /* The main function of the worker. */ @@ -140,14 +188,14 @@ static void* fireperf_worker_main(void* w) { // Handle incoming data if (events[i].events & EPOLLIN) { - r = handle_connection_recv(worker->ctx, &worker->stats, fd); + r = fireperf_worker_recv(worker, fd); if (r < 0) goto ERROR; } // Handle outgoing data if (events[i].events & EPOLLOUT) { - r = handle_connection_send(worker->ctx, &worker->stats, fd); + r = fireperf_worker_send(worker, fd); if (r < 0) goto ERROR; } -- 2.47.2