#############################################################################*/
#include <errno.h>
+#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
-#include <sys/epoll.h>
+#include <string.h>
#include <unistd.h>
+#include <liburing.h>
+
#include "constants.h"
#include "ctx.h"
#include "main.h"
#include "stats.h"
#include "worker.h"
+#define MAX_CONNECTIONS 128
+
static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
+enum {
+ FIREPERF_EVENT_SEND = 1,
+ FIREPERF_EVENT_SENT = 2,
+ FIREPERF_EVENT_RECV = 3,
+};
+
+#define EVENT(type, fd) (void*)(uint64_t)((fd << 8) | type)
+
+#define EVENT_GET_TYPE(e) ((uint64_t)e & 0xff)
+#define EVENT_GET_FD(e) ((uint64_t)e >> 8)
+
struct fireperf_worker {
pthread_t thread;
// Collect stats
struct fireperf_stats stats;
- // Epoll
- int epollfd;
+ // IO uring
+ struct io_uring ring;
+
+ // Receive Buffer (1 MiB)
+ char recv_buffer[1 * 1024 * 1024];
// Exit Code
int r;
};
+static int fireperf_worker_register_recv_buffer(struct fireperf_worker* worker) {
+ const struct iovec recv_buffer = {
+ .iov_base = worker->recv_buffer,
+ .iov_len = sizeof(worker->recv_buffer),
+ };
+ int r;
+
+ // Register for zero-copy
+ r = io_uring_register_buffers(&worker->ring, &recv_buffer, 1);
+ if (r < 0)
+ ERROR(worker->ctx, "Could not register the receive buffer: %s\n", strerror(-r));
+
+ return r;
+}
+
int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx) {
struct fireperf_worker* w = NULL;
int r;
// Reference the configuration
w->ctx = ctx;
- // Create a new event loop
- w->epollfd = epoll_create1(0);
- if (w->epollfd < 0) {
- ERROR(ctx, "Could not create event loop: %m\n");
- r = -errno;
+ // Setup ring
+ r = io_uring_queue_init(512, &w->ring, 0);
+ if (r) {
+ ERROR(w->ctx, "Could not set up IO uring: %s\n", strerror(r));
goto ERROR;
}
+ // Register the receive buffer
+ r = fireperf_worker_register_recv_buffer(w);
+ if (r)
+ goto ERROR;
+
DEBUG(ctx, "Created a new worker\n");
// Return the worker
}
void fireperf_worker_free(struct fireperf_worker* worker) {
- if (worker->epollfd >= 0)
- close(worker->epollfd);
+ io_uring_queue_exit(&worker->ring);
free(worker);
}
-static int fireperf_worker_send(struct fireperf_worker* worker, int fd) {
+static int fireperf_worker_register_send(struct fireperf_worker* worker, const int sockfd) {
+ int r;
+
+ // Fetch a new submission queue entry
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring);
+ if (!sqe)
+ return -ENOSPC;
+
+ // Poll for when the socket is ready to write to
+ io_uring_prep_poll_add(sqe, sockfd, POLLOUT);
+
+ // Store the event
+ io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_SEND, sockfd));
+
+ return 0;
+}
+
+static int fireperf_worker_send(
+ struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) {
const char* buffer = ZERO;
ssize_t bytes_sent;
+ int r;
+
+ // Fetch a new submission queue entry
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring);
+ if (!sqe)
+ return -ENOSPC;
// Fetch some random data if requested
if (!worker->ctx->zero)
buffer = fireperf_random_pool_get_slice(worker->ctx, SOCKET_SEND_BUFFER_SIZE);
- // Send the buffer
- do {
- bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY);
- } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+ // Send a chunk of data
+ io_uring_prep_send_zc(sqe, sockfd, buffer, SOCKET_SEND_BUFFER_SIZE, 0, 0);
- // Update statistics
- worker->stats.bytes_sent += bytes_sent;
- worker->stats.total_bytes_sent += bytes_sent;
+ // Store the event
+ io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_SENT, sockfd));
return 0;
}
-static int fireperf_worker_recv(struct fireperf_worker* worker, int fd) {
- char buffer[SOCKET_RECV_BUFFER_SIZE];
- ssize_t bytes_read;
+static int fireperf_worker_sent(
+ struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) {
+ // Handle errors
+ if (cqe->res < 0) {
+ ERROR(worker->ctx, "Could not send to %d: %s\n", sockfd, strerror(-cqe->res));
+ return cqe->res;
+ }
- // Try reading into buffer
- do {
- bytes_read = recv(fd, buffer, sizeof(buffer), 0);
- } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+ if (cqe->res == 0)
+ return 0;
- // Error?
- if (bytes_read < 0) {
- ERROR(worker->ctx, "Could not read from socket %d: %m\n", fd);
- return -errno;
+ DEBUG(worker->ctx, "Sent %zu bytes to socket %d\n", cqe->res, sockfd);
+
+ // Update statistics
+ worker->stats.bytes_sent += cqe->res;
+ worker->stats.total_bytes_sent += cqe->res;
+
+ // Ask to send more data
+ return fireperf_worker_register_send(worker, sockfd);
+}
+
+static int fireperf_worker_register_recv(struct fireperf_worker* worker, const int sockfd) {
+ int r;
+
+ // Fetch a new submission queue entry
+ struct io_uring_sqe *sqe = io_uring_get_sqe(&worker->ring);
+ if (!sqe)
+ return -ENOSPC;
+
+ io_uring_prep_recv(sqe, sockfd, worker->recv_buffer, 0, IOSQE_BUFFER_SELECT);
+
+ // Store the event
+ io_uring_sqe_set_data(sqe, EVENT(FIREPERF_EVENT_RECV, sockfd));
+
+ return 0;
+}
+
+static int fireperf_worker_recv(
+ struct fireperf_worker* worker, const struct io_uring_cqe* cqe, int sockfd) {
+ size_t bytes_read = cqe->res;
+
+ // Did we encounter an error?
+ if (cqe->res < 0) {
+ ERROR(worker->ctx, "Could not read from socket %d: %s\n", sockfd, strerror(-cqe->res));
+ return cqe->res;
}
- //DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
+ DEBUG(worker->ctx, "Read %zu bytes from socket %d\n", bytes_read, sockfd);
// Update statistics
worker->stats.bytes_received += bytes_read;
worker->stats.total_bytes_received += bytes_read;
- return 0;
+ // Ask to receive more data
+ return fireperf_worker_register_recv(worker, sockfd);
}
/*
DEBUG(worker->ctx, "New worker launched as %lu\n", pthread_self());
- struct epoll_event events[EPOLL_MAX_EVENTS];
+ struct io_uring_cqe* cqe = NULL;
+
+ unsigned int head = 0;
// Enter the main loop...
for (;;) {
- ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
- if (ready < 1) {
- // We terminate gracefully when we receive a signal
- if (errno == EINTR)
- break;
-
- ERROR(worker->ctx, "epoll_wait() failed: %m\n");
+ // Submit the submission queue and wait for anything to finish
+ r = io_uring_submit_and_wait(&worker->ring, 1);
+ if (r < 0) {
+ ERROR(worker->ctx, "io_uring_submit_and_wait() failed: %m\n");
r = -errno;
goto ERROR;
}
- // Loop through all sockets that are ready...
- for (int i = 0; i < ready; i++) {
- int fd = events[i].data.fd;
-
- // Handle closed connections
- if (events[i].events & EPOLLRDHUP) {
- DEBUG(worker->ctx, "Connection %d has closed\n", fd);
-
- // We now have one fewer connections
- worker->stats.open_connections--;
-
- // Remove the file descriptor from epoll()
- r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL);
- if (r) {
- ERROR(worker->ctx, "Could not remove socket file descriptfor from epoll(): %m\n");
- r = -errno;
- goto ERROR;
- }
-
- // Free up any resources
- close(fd);
-
- // There is no point in continuing any further after the connection closed
- break;
- }
-
- // Handle incoming data
- if (events[i].events & EPOLLIN) {
- r = fireperf_worker_recv(worker, fd);
- if (r < 0)
- goto ERROR;
+ unsigned int events = 0;
+
+ // Process everything in the completion queue
+ io_uring_for_each_cqe(&worker->ring, head, cqe) {
+ // Fetch the event
+ void* event = io_uring_cqe_get_data(cqe);
+
+ // Fetch the file descriptor
+ int fd = EVENT_GET_FD(event);
+
+ //DEBUG(worker->ctx, "Handling event %d\n", EVENT_GET_TYPE(event));
+
+ switch (EVENT_GET_TYPE(event)) {
+ case FIREPERF_EVENT_SEND:
+ r = fireperf_worker_send(worker, cqe, fd);
+ if (r)
+ goto ERROR;
+ break;
+
+ case FIREPERF_EVENT_SENT:
+ r = fireperf_worker_sent(worker, cqe, fd);
+ if (r)
+ goto ERROR;
+ break;
+
+ case FIREPERF_EVENT_RECV:
+ r = fireperf_worker_send(worker, cqe, fd);
+ if (r)
+ goto ERROR;
+ break;
+
+ // Log unhandled events
+ default:
+ DEBUG(worker->ctx, "Unhandled event %d\n", EVENT_GET_TYPE(event));
+ break;
}
- // Handle outgoing data
- if (events[i].events & EPOLLOUT) {
- r = fireperf_worker_send(worker, fd);
- if (r < 0)
- goto ERROR;
- }
+ // Count all successfully handled events
+ events++;
}
+
+ DEBUG(worker->ctx, "Handled %zu event(s)\n", events);
+
+ // Mark them all as done
+ io_uring_cq_advance(&worker->ring, events);
}
DEBUG(worker->ctx, "Worker has gracefully terminated\n");
return pthread_tryjoin_np(worker->thread, NULL);
}
-int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket) {
+int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int sockfd) {
int r;
- // Always listen to the connection closing
- events |= EPOLLRDHUP;
+ // Some event must be requested
+ if (!events)
+ return -EINVAL;
- struct epoll_event ev = {
- .events = events,
- .data.fd = socket,
- };
+ // Send data?
+ if (events & FIREPERF_WORKER_SEND) {
+ r = fireperf_worker_register_send(worker, sockfd);
+ if (r)
+ return r;
+ }
- // Add the socket to the loop
- r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
- if (r)
- ERROR(worker->ctx, "Could not add the socket to epoll(): %m\n");
+ // Receive data?
+ if (events & FIREPERF_WORKER_RECV) {
+ r = fireperf_worker_register_recv(worker, sockfd);
+ if (r)
+ return r;
+ }
+
+ // Submit the request(s)
+ r = io_uring_submit(&worker->ring);
+ if (r < 0) {
+ ERROR(worker->ctx, "Could not submit socket %d: %s\n", strerror(-r));
+ return r;
+ }
// We now have one more open connections
worker->stats.open_connections++;
- return r;
+ return 0;
}
const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker) {