From 129b4966d390663a4193f8652b2a7efcb3deca30 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sat, 28 Sep 2024 14:50:41 +0000 Subject: [PATCH] workers: Prototype implementation with IO uring This should help us to reduce the number of syscalls that we have to perform which are the bottleneck whenever we benchmark anything. Signed-off-by: Michael Tremer --- src/client.c | 6 +- src/server.c | 22 +++- src/worker.c | 283 +++++++++++++++++++++++++++++++++++---------------- src/worker.h | 5 + 4 files changed, 223 insertions(+), 93 deletions(-) diff --git a/src/client.c b/src/client.c index f87cb56..4e0e2cf 100644 --- a/src/client.c +++ b/src/client.c @@ -126,15 +126,15 @@ static int open_connection(struct fireperf_ctx* ctx, goto ERROR; } - int events = EPOLLIN; + int events = 0; // Let us know when the socket is ready for receiving data if (!ctx->keepalive_only) - events |= EPOLLIN; + events |= FIREPERF_WORKER_RECV; // In duplex mode, we send data, too if (ctx->duplex) - events |= EPOLLOUT; + events |= FIREPERF_WORKER_SEND; // Delegate the connection r = fireperf_worker_delegate(worker, events, fd); diff --git a/src/server.c b/src/server.c index d5d0c6d..ecff0c9 100644 --- a/src/server.c +++ b/src/server.c @@ -95,6 +95,24 @@ int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd) { goto ERROR; } +#if 1 + // Enable to re-use the address + r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)); + if (r) { + ERROR(ctx, "Could not set SO_REUSEADDR on socket %d: %s\n", + fd, strerror(errno)); + goto ERROR; + } + + // Enable to re-use the port + r = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &flags, sizeof(flags)); + if (r) { + ERROR(ctx, "Could not set SO_REUSEPORT on socket %d: %s\n", + fd, strerror(errno)); + goto ERROR; + } +#endif + // Enable zero-copy r = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &flags, sizeof(flags)); if (r) { @@ -196,11 +214,11 @@ int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int sockfd) { } // Read any data - int events = EPOLLIN; + int events = FIREPERF_WORKER_RECV; // Send data unless we are in keepalive-only mode if (!ctx->keepalive_only) - events |= EPOLLOUT; + events |= FIREPERF_WORKER_SEND; // Delegate the connection r = fireperf_worker_delegate(worker, events, fd); diff --git a/src/worker.c b/src/worker.c index 75f64b2..7fe21d7 100644 --- a/src/worker.c +++ b/src/worker.c @@ -19,11 +19,14 @@ #############################################################################*/ #include +#include #include #include -#include +#include #include +#include + #include "constants.h" #include "ctx.h" #include "main.h" @@ -32,8 +35,21 @@ #include "stats.h" #include "worker.h" +#define MAX_CONNECTIONS 128 + static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 }; +enum { + FIREPERF_EVENT_SEND = 1, + FIREPERF_EVENT_SENT = 2, + FIREPERF_EVENT_RECV = 3, +}; + +#define EVENT(type, fd) (void*)(uint64_t)((fd << 8) | type) + +#define EVENT_GET_TYPE(e) ((uint64_t)e & 0xff) +#define EVENT_GET_FD(e) ((uint64_t)e >> 8) + struct fireperf_worker { pthread_t thread; @@ -43,13 +59,31 @@ struct fireperf_worker { // Collect stats struct fireperf_stats stats; - // Epoll - int epollfd; + // IO uring + struct io_uring ring; + + // Receive Buffer (1 MiB) + char recv_buffer[1 * 1024 * 1024]; // Exit Code int r; }; +static int fireperf_worker_register_recv_buffer(struct fireperf_worker* worker) { + const struct iovec recv_buffer = { + .iov_base = worker->recv_buffer, + .iov_len = sizeof(worker->recv_buffer), + }; + int r; + + // Register for zero-copy + r = io_uring_register_buffers(&worker->ring, &recv_buffer, 1); + if (r < 0) + ERROR(worker->ctx, "Could not register the receive buffer: %s\n", strerror(-r)); + + return r; +} + int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx) { struct fireperf_worker* w = NULL; int r; @@ -62,14 +96,18 @@ int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* // Reference the configuration w->ctx = ctx; - // Create a new event loop - w->epollfd = epoll_create1(0); - if (w->epollfd < 0) { - ERROR(ctx, "Could not create event loop: %m\n"); - r = -errno; + // Setup ring + r = io_uring_queue_init(512, &w->ring, 0); + if (r) { + ERROR(w->ctx, "Could not set up IO uring: %s\n", strerror(r)); goto ERROR; } + // Register the receive buffer + r = fireperf_worker_register_recv_buffer(w); + if (r) + goto ERROR; + DEBUG(ctx, "Created a new worker\n"); // Return the worker @@ -85,54 +123,107 @@ ERROR: } void fireperf_worker_free(struct fireperf_worker* worker) { - if (worker->epollfd >= 0) - close(worker->epollfd); + io_uring_queue_exit(&worker->ring); free(worker); } -static int fireperf_worker_send(struct fireperf_worker* worker, int fd) { +static int fireperf_worker_register_send(struct fireperf_worker* worker, const int sockfd) { + int r; + + // Fetch a new submission queue entry + struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring); + if (!sqe) + return -ENOSPC; + + // Poll for when the socket is ready to write to + io_uring_prep_poll_add(sqe, sockfd, POLLOUT); + + // Store the event + io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_SEND, sockfd)); + + return 0; +} + +static int fireperf_worker_send( + struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) { const char* buffer = ZERO; ssize_t bytes_sent; + int r; + + // Fetch a new submission queue entry + struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring); + if (!sqe) + return -ENOSPC; // Fetch some random data if requested if (!worker->ctx->zero) buffer = fireperf_random_pool_get_slice(worker->ctx, SOCKET_SEND_BUFFER_SIZE); - // Send the buffer - do { - bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY); - } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); + // Send a chunk of data + io_uring_prep_send_zc(sqe, sockfd, buffer, SOCKET_SEND_BUFFER_SIZE, 0, 0); - // Update statistics - worker->stats.bytes_sent += bytes_sent; - worker->stats.total_bytes_sent += bytes_sent; + // Store the event + io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_SENT, sockfd)); return 0; } -static int fireperf_worker_recv(struct fireperf_worker* worker, int fd) { - char buffer[SOCKET_RECV_BUFFER_SIZE]; - ssize_t bytes_read; +static int fireperf_worker_sent( + struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) { + // Handle errors + if (cqe->res < 0) { + ERROR(worker->ctx, "Could not send to %d: %s\n", sockfd, strerror(-cqe->res)); + return cqe->res; + } - // Try reading into buffer - do { - bytes_read = recv(fd, buffer, sizeof(buffer), 0); - } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)); + if (cqe->res == 0) + return 0; - // Error? - if (bytes_read < 0) { - ERROR(worker->ctx, "Could not read from socket %d: %m\n", fd); - return -errno; + DEBUG(worker->ctx, "Sent %zu bytes to socket %d\n", cqe->res, sockfd); + + // Update statistics + worker->stats.bytes_sent += cqe->res; + worker->stats.total_bytes_sent += cqe->res; + + // Ask to send more data + return fireperf_worker_register_send(worker, sockfd); +} + +static int fireperf_worker_register_recv(struct fireperf_worker* worker, const int sockfd) { + int r; + + // Fetch a new submission queue entry + struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring); + if (!sqe) + return -ENOSPC; + + io_uring_prep_recv(sqe, sockfd, worker->recv_buffer, 0, IOSQE_BUFFER_SELECT); + + // Store the event + io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_RECV, sockfd)); + + return 0; +} + +static int fireperf_worker_recv( + struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) { + size_t bytes_read = cqe->res; + + // Did we encounter an error? + if (cqe->res < 0) { + ERROR(worker->ctx, "Could not read from socket %d: %s\n", sockfd, strerror(-cqe->res)); + return cqe->res; } - //DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd); + DEBUG(worker->ctx, "Read %zu bytes from socket %d\n", bytes_read, sockfd); // Update statistics worker->stats.bytes_received += bytes_read; worker->stats.total_bytes_received += bytes_read; - return 0; + // Ask to receive more data + return fireperf_worker_register_recv(worker, sockfd); } /* @@ -145,61 +236,65 @@ static void* fireperf_worker_main(void* w) { DEBUG(worker->ctx, "New worker launched as %lu\n", pthread_self()); - struct epoll_event events[EPOLL_MAX_EVENTS]; + struct io_uring_cqe* cqe = NULL; + + unsigned int head = 0; // Enter the main loop... for (;;) { - ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1); - if (ready < 1) { - // We terminate gracefully when we receive a signal - if (errno == EINTR) - break; - - ERROR(worker->ctx, "epoll_wait() failed: %m\n"); + // Submit the submission queue and wait for anything to finish + r = io_uring_submit_and_wait(&worker->ring, 1); + if (r < 0) { + ERROR(worker->ctx, "io_uring_submit_and_wait() failed: %m\n"); r = -errno; goto ERROR; } - // Loop through all sockets that are ready... - for (int i = 0; i < ready; i++) { - int fd = events[i].data.fd; - - // Handle closed connections - if (events[i].events & EPOLLRDHUP) { - DEBUG(worker->ctx, "Connection %d has closed\n", fd); - - // We now have one fewer connections - worker->stats.open_connections--; - - // Remove the file descriptor from epoll() - r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL); - if (r) { - ERROR(worker->ctx, "Could not remove socket file descriptfor from epoll(): %m\n"); - r = -errno; - goto ERROR; - } - - // Free up any resources - close(fd); - - // There is no point in continuing any further after the connection closed - break; - } - - // Handle incoming data - if (events[i].events & EPOLLIN) { - r = fireperf_worker_recv(worker, fd); - if (r < 0) - goto ERROR; + unsigned int events = 0; + + // Process everything in the completion queue + io_uring_for_each_cqe(&worker->ring, head, cqe) { + // Fetch the event + void* event = io_uring_cqe_get_data(cqe); + + // Fetch the file descriptor + int fd = EVENT_GET_FD(event); + + //DEBUG(worker->ctx, "Handling event %d\n", EVENT_GET_TYPE(event)); + + switch (EVENT_GET_TYPE(event)) { + case FIREPERF_EVENT_SEND: + r = fireperf_worker_send(worker, cqe, fd); + if (r) + goto ERROR; + break; + + case FIREPERF_EVENT_SENT: + r = fireperf_worker_sent(worker, cqe, fd); + if (r) + goto ERROR; + break; + + case FIREPERF_EVENT_RECV: + r = fireperf_worker_send(worker, cqe, fd); + if (r) + goto ERROR; + break; + + // Log unhandled events + default: + DEBUG(worker->ctx, "Unhandled event %d\n", EVENT_GET_TYPE(event)); + break; } - // Handle outgoing data - if (events[i].events & EPOLLOUT) { - r = fireperf_worker_send(worker, fd); - if (r < 0) - goto ERROR; - } + // Count all successfully handled events + events++; } + + DEBUG(worker->ctx, "Handled %zu event(s)\n", events); + + // Mark them all as done + io_uring_cq_advance(&worker->ring, events); } DEBUG(worker->ctx, "Worker has gracefully terminated\n"); @@ -236,26 +331,38 @@ int fireperf_worker_is_alive(struct fireperf_worker* worker) { return pthread_tryjoin_np(worker->thread, NULL); } -int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket) { +int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int sockfd) { int r; - // Always listen to the connection closing - events |= EPOLLRDHUP; + // Some event must be requested + if (!events) + return -EINVAL; - struct epoll_event ev = { - .events = events, - .data.fd = socket, - }; + // Send data? + if (events & FIREPERF_WORKER_SEND) { + r = fireperf_worker_register_send(worker, sockfd); + if (r) + return r; + } - // Add the socket to the loop - r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev); - if (r) - ERROR(worker->ctx, "Could not add the socket to epoll(): %m\n"); + // Receive data? + if (events & FIREPERF_WORKER_RECV) { + r = fireperf_worker_register_recv(worker, sockfd); + if (r) + return r; + } + + // Submit the request(s) + r = io_uring_submit(&worker->ring); + if (r < 0) { + ERROR(worker->ctx, "Could not submit socket %d: %s\n", strerror(-r)); + return r; + } // We now have one more open connections worker->stats.open_connections++; - return r; + return 0; } const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker) { diff --git a/src/worker.h b/src/worker.h index a4a34e7..8bf8c49 100644 --- a/src/worker.h +++ b/src/worker.h @@ -25,6 +25,11 @@ struct fireperf_worker; #include "ctx.h" +enum { + FIREPERF_WORKER_SEND = (1 << 0), + FIREPERF_WORKER_RECV = (1 << 1), +}; + int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx); void fireperf_worker_free(struct fireperf_worker* worker); -- 2.47.2