]> git.ipfire.org Git - fireperf.git/commitdiff
workers: Prototype implementation with IO uring
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 28 Sep 2024 14:50:41 +0000 (14:50 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 28 Sep 2024 14:50:41 +0000 (14:50 +0000)
This should help us to reduce the number of syscalls that we have to
perform which are the bottleneck whenever we benchmark anything.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/client.c
src/server.c
src/worker.c
src/worker.h

index f87cb56ff3600ef2b0dd9393124a50ab593ec68e..4e0e2cf9e88065e32b679d77b214360446d18a98 100644 (file)
@@ -126,15 +126,15 @@ static int open_connection(struct fireperf_ctx* ctx,
                goto ERROR;
        }
 
-       int events = EPOLLIN;
+       int events = 0;
 
        // Let us know when the socket is ready for receiving data
        if (!ctx->keepalive_only)
-               events |= EPOLLIN;
+               events |= FIREPERF_WORKER_RECV;
 
        // In duplex mode, we send data, too
        if (ctx->duplex)
-               events |= EPOLLOUT;
+               events |= FIREPERF_WORKER_SEND;
 
        // Delegate the connection
        r = fireperf_worker_delegate(worker, events, fd);
index d5d0c6d1ae08179bf3ea5ebc4b1e7b38ab2c7930..ecff0c94b9ebbb6a5e466e6cbf15bfc708c29d94 100644 (file)
@@ -95,6 +95,24 @@ int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd) {
                goto ERROR;
        }
 
+#if 1
+       // Enable to re-use the address
+       r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
+       if (r) {
+               ERROR(ctx, "Could not set SO_REUSEADDR on socket %d: %s\n",
+                       fd, strerror(errno));
+               goto ERROR;
+       }
+
+       // Enable to re-use the port
+       r = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &flags, sizeof(flags));
+       if (r) {
+               ERROR(ctx, "Could not set SO_REUSEPORT on socket %d: %s\n",
+                       fd, strerror(errno));
+               goto ERROR;
+       }
+#endif
+
        // Enable zero-copy
        r = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &flags, sizeof(flags));
        if (r) {
@@ -196,11 +214,11 @@ int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int sockfd) {
        }
 
        // Read any data
-       int events = EPOLLIN;
+       int events = FIREPERF_WORKER_RECV;
 
        // Send data unless we are in keepalive-only mode
        if (!ctx->keepalive_only)
-               events |= EPOLLOUT;
+               events |= FIREPERF_WORKER_SEND;
 
        // Delegate the connection
        r = fireperf_worker_delegate(worker, events, fd);
index 75f64b2b94c034eb1fbee4aa70601dfc8a6f8373..7fe21d7ee40c6868f15b9ac15b2bf6f33a6bdb1c 100644 (file)
 #############################################################################*/
 
 #include <errno.h>
+#include <poll.h>
 #include <pthread.h>
 #include <stdlib.h>
-#include <sys/epoll.h>
+#include <string.h>
 #include <unistd.h>
 
+#include <liburing.h>
+
 #include "constants.h"
 #include "ctx.h"
 #include "main.h"
 #include "stats.h"
 #include "worker.h"
 
+#define MAX_CONNECTIONS 128
+
 static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
 
+enum {
+       FIREPERF_EVENT_SEND = 1,
+       FIREPERF_EVENT_SENT = 2,
+       FIREPERF_EVENT_RECV = 3,
+};
+
+#define EVENT(type, fd)   (void*)(uint64_t)((fd << 8) | type)
+
+#define EVENT_GET_TYPE(e) ((uint64_t)e & 0xff)
+#define EVENT_GET_FD(e)   ((uint64_t)e >> 8)
+
 struct fireperf_worker {
        pthread_t thread;
 
@@ -43,13 +59,31 @@ struct fireperf_worker {
        // Collect stats
        struct fireperf_stats stats;
 
-       // Epoll
-       int epollfd;
+       // IO uring
+       struct io_uring ring;
+
+       // Receive Buffer (1 MiB)
+       char recv_buffer[1 * 1024 * 1024];
 
        // Exit Code
        int r;
 };
 
+static int fireperf_worker_register_recv_buffer(struct fireperf_worker* worker) {
+       const struct iovec recv_buffer = {
+               .iov_base = worker->recv_buffer,
+               .iov_len = sizeof(worker->recv_buffer),
+       };
+       int r;
+
+       // Register for zero-copy
+       r = io_uring_register_buffers(&worker->ring, &recv_buffer, 1);
+       if (r < 0)
+               ERROR(worker->ctx, "Could not register the receive buffer: %s\n", strerror(-r));
+
+       return r;
+}
+
 int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx) {
        struct fireperf_worker* w = NULL;
        int r;
@@ -62,14 +96,18 @@ int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx*
        // Reference the configuration
        w->ctx = ctx;
 
-       // Create a new event loop
-       w->epollfd = epoll_create1(0);
-       if (w->epollfd < 0) {
-               ERROR(ctx, "Could not create event loop: %m\n");
-               r = -errno;
+       // Setup ring
+       r = io_uring_queue_init(512, &w->ring, 0);
+       if (r) {
+               ERROR(w->ctx, "Could not set up IO uring: %s\n", strerror(r));
                goto ERROR;
        }
 
+       // Register the receive buffer
+       r = fireperf_worker_register_recv_buffer(w);
+       if (r)
+               goto ERROR;
+
        DEBUG(ctx, "Created a new worker\n");
 
        // Return the worker
@@ -85,54 +123,107 @@ ERROR:
 }
 
 void fireperf_worker_free(struct fireperf_worker* worker) {
-       if (worker->epollfd >= 0)
-               close(worker->epollfd);
+       io_uring_queue_exit(&worker->ring);
 
        free(worker);
 }
 
-static int fireperf_worker_send(struct fireperf_worker* worker, int fd) {
+static int fireperf_worker_register_send(struct fireperf_worker* worker, const int sockfd) {
+       int r;
+
+       // Fetch a new submission queue entry
+       struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring);
+       if (!sqe)
+               return -ENOSPC;
+
+       // Poll for when the socket is ready to write to
+       io_uring_prep_poll_add(sqe, sockfd, POLLOUT);
+
+       // Store the event
+       io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_SEND, sockfd));
+
+       return 0;
+}
+
+static int fireperf_worker_send(
+               struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) {
        const char* buffer = ZERO;
        ssize_t bytes_sent;
+       int r;
+
+       // Fetch a new submission queue entry
+       struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring);
+       if (!sqe)
+               return -ENOSPC;
 
        // Fetch some random data if requested
        if (!worker->ctx->zero)
                buffer = fireperf_random_pool_get_slice(worker->ctx, SOCKET_SEND_BUFFER_SIZE);
 
-       // Send the buffer
-       do {
-               bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY);
-       } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+       // Send a chunk of data
+       io_uring_prep_send_zc(sqe, sockfd, buffer, SOCKET_SEND_BUFFER_SIZE, 0, 0);
 
-       // Update statistics
-       worker->stats.bytes_sent       += bytes_sent;
-       worker->stats.total_bytes_sent += bytes_sent;
+       // Store the event
+       io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_SENT, sockfd));
 
        return 0;
 }
 
-static int fireperf_worker_recv(struct fireperf_worker* worker, int fd) {
-       char buffer[SOCKET_RECV_BUFFER_SIZE];
-       ssize_t bytes_read;
+static int fireperf_worker_sent(
+               struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) {
+       // Handle errors
+       if (cqe->res < 0) {
+               ERROR(worker->ctx, "Could not send to %d: %s\n", sockfd, strerror(-cqe->res));
+               return cqe->res;
+       }
 
-       // Try reading into buffer
-       do {
-               bytes_read = recv(fd, buffer, sizeof(buffer), 0);
-       } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+       if (cqe->res == 0)
+               return 0;
 
-       // Error?
-       if (bytes_read < 0) {
-               ERROR(worker->ctx, "Could not read from socket %d: %m\n", fd);
-               return -errno;
+       DEBUG(worker->ctx, "Sent %zu bytes to socket %d\n", cqe->res, sockfd);
+
+       // Update statistics
+       worker->stats.bytes_sent       += cqe->res;
+       worker->stats.total_bytes_sent += cqe->res;
+
+       // Ask to send more data
+       return fireperf_worker_register_send(worker, sockfd);
+}
+
+static int fireperf_worker_register_recv(struct fireperf_worker* worker, const int sockfd) {
+       int r;
+
+       // Fetch a new submission queue entry
+       struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring);
+       if (!sqe)
+               return -ENOSPC;
+
+       io_uring_prep_recv(sqe, sockfd, worker->recv_buffer, 0, IOSQE_BUFFER_SELECT);
+
+       // Store the event
+       io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_RECV, sockfd));
+
+       return 0;
+}
+
+static int fireperf_worker_recv(
+               struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) {
+       size_t bytes_read = cqe->res;
+
+       // Did we encounter an error?
+       if (cqe->res < 0) {
+               ERROR(worker->ctx, "Could not read from socket %d: %s\n", sockfd, strerror(-cqe->res));
+               return cqe->res;
        }
 
-       //DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
+       DEBUG(worker->ctx, "Read %zu bytes from socket %d\n", bytes_read, sockfd);
 
        // Update statistics
        worker->stats.bytes_received       += bytes_read;
        worker->stats.total_bytes_received += bytes_read;
 
-       return 0;
+       // Ask to receive more data
+       return fireperf_worker_register_recv(worker, sockfd);
 }
 
 /*
@@ -145,61 +236,65 @@ static void* fireperf_worker_main(void* w) {
 
        DEBUG(worker->ctx, "New worker launched as %lu\n", pthread_self());
 
-       struct epoll_event events[EPOLL_MAX_EVENTS];
+       struct io_uring_cqe* cqe = NULL;
+
+       unsigned int head = 0;
 
        // Enter the main loop...
        for (;;) {
-               ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
-               if (ready < 1) {
-                       // We terminate gracefully when we receive a signal
-                       if (errno == EINTR)
-                               break;
-
-                       ERROR(worker->ctx, "epoll_wait() failed: %m\n");
+               // Submit the submission queue and wait for anything to finish
+               r = io_uring_submit_and_wait(&worker->ring, 1);
+               if (r < 0) {
+                       ERROR(worker->ctx, "io_uring_submit_and_wait() failed: %m\n");
                        r = -errno;
                        goto ERROR;
                }
 
-               // Loop through all sockets that are ready...
-               for (int i = 0; i < ready; i++) {
-                       int fd = events[i].data.fd;
-
-                       // Handle closed connections
-                       if (events[i].events & EPOLLRDHUP) {
-                               DEBUG(worker->ctx, "Connection %d has closed\n", fd);
-
-                               // We now have one fewer connections
-                               worker->stats.open_connections--;
-
-                               // Remove the file descriptor from epoll()
-                               r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL);
-                               if (r) {
-                                       ERROR(worker->ctx, "Could not remove socket file descriptfor from epoll(): %m\n");
-                                       r = -errno;
-                                       goto ERROR;
-                               }
-
-                               // Free up any resources
-                               close(fd);
-
-                               // There is no point in continuing any further after the connection closed
-                               break;
-                       }
-
-                       // Handle incoming data
-                       if (events[i].events & EPOLLIN) {
-                               r = fireperf_worker_recv(worker, fd);
-                               if (r < 0)
-                                       goto ERROR;
+               unsigned int events = 0;
+
+               // Process everything in the completion queue
+               io_uring_for_each_cqe(&worker->ring, head, cqe) {
+                       // Fetch the event
+                       void* event = io_uring_cqe_get_data(cqe);
+
+                       // Fetch the file descriptor
+                       int fd = EVENT_GET_FD(event);
+
+                       //DEBUG(worker->ctx, "Handling event %d\n", EVENT_GET_TYPE(event));
+
+                       switch (EVENT_GET_TYPE(event)) {
+                               case FIREPERF_EVENT_SEND:
+                                       r = fireperf_worker_send(worker, cqe, fd);
+                                       if (r)
+                                               goto ERROR;
+                                       break;
+
+                               case FIREPERF_EVENT_SENT:
+                                       r = fireperf_worker_sent(worker, cqe, fd);
+                                       if (r)
+                                               goto ERROR;
+                                       break;
+
+                               case FIREPERF_EVENT_RECV:
+                                       r = fireperf_worker_send(worker, cqe, fd);
+                                       if (r)
+                                               goto ERROR;
+                                       break;
+
+                               // Log unhandled events
+                               default:
+                                       DEBUG(worker->ctx, "Unhandled event %d\n", EVENT_GET_TYPE(event));
+                                       break;
                        }
 
-                       // Handle outgoing data
-                       if (events[i].events & EPOLLOUT) {
-                               r = fireperf_worker_send(worker, fd);
-                               if (r < 0)
-                                       goto ERROR;
-                       }
+                       // Count all successfully handled events
+                       events++;
                }
+
+               DEBUG(worker->ctx, "Handled %zu event(s)\n", events);
+
+               // Mark them all as done
+               io_uring_cq_advance(&worker->ring, events);
        }
 
        DEBUG(worker->ctx, "Worker has gracefully terminated\n");
@@ -236,26 +331,38 @@ int fireperf_worker_is_alive(struct fireperf_worker* worker) {
        return pthread_tryjoin_np(worker->thread, NULL);
 }
 
-int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket) {
+int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int sockfd) {
        int r;
 
-       // Always listen to the connection closing
-       events |= EPOLLRDHUP;
+       // Some event must be requested
+       if (!events)
+               return -EINVAL;
 
-       struct epoll_event ev = {
-               .events  = events,
-               .data.fd = socket,
-       };
+       // Send data?
+       if (events & FIREPERF_WORKER_SEND) {
+               r = fireperf_worker_register_send(worker, sockfd);
+               if (r)
+                       return r;
+       }
 
-       // Add the socket to the loop
-       r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
-       if (r)
-               ERROR(worker->ctx, "Could not add the socket to epoll(): %m\n");
+       // Receive data?
+       if (events & FIREPERF_WORKER_RECV) {
+               r = fireperf_worker_register_recv(worker, sockfd);
+               if (r)
+                       return r;
+       }
+
+       // Submit the request(s)
+       r = io_uring_submit(&worker->ring);
+       if (r < 0) {
+               ERROR(worker->ctx, "Could not submit socket %d: %s\n", strerror(-r));
+               return r;
+       }
 
        // We now have one more open connections
        worker->stats.open_connections++;
 
-       return r;
+       return 0;
 }
 
 const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker) {
index a4a34e7338e42fff22d8c5758ba1ff74a8b71dda..8bf8c49e7ba690d785920a67f9711c8d6106e4d1 100644 (file)
@@ -25,6 +25,11 @@ struct fireperf_worker;
 
 #include "ctx.h"
 
+enum {
+       FIREPERF_WORKER_SEND = (1 << 0),
+       FIREPERF_WORKER_RECV = (1 << 1),
+};
+
 int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx);
 void fireperf_worker_free(struct fireperf_worker* worker);