#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
+#include <sys/timerfd.h>
#include <unistd.h>
#include "client.h"
#include "random.h"
#include "util.h"
-// Set to one when the timeout has expired
-static int timeout_expired = 0;
+struct fireperf_client_state {
+ // For the timeout
+ int timerfd;
-static void handle_SIGALRM(int signal) {
- switch (signal) {
- // Terminate after timeout has expired
- case SIGALRM:
- timeout_expired = 1;
- break;
- }
+ // How many open connections?
+ unsigned int connections;
+};
+
+static int setup_timer(struct fireperf_ctx* ctx, int epollfd) {
+ int timerfd = -1;
+ int r;
+
+ // Create timerfd()
+ timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
+ if (timerfd < 0)
+ return -errno;
+
+ struct epoll_event ev = {
+ .events = EPOLLIN,
+ .data.fd = timerfd,
+ };
+
+ // Register the timer with the event loop
+ r = epoll_ctl(epollfd, EPOLL_CTL_ADD, timerfd, &ev);
+ if (r)
+ return -errno;
+
+ // Let the timer ping us once a second
+ struct itimerspec timer = {
+ .it_value.tv_sec = ctx->timeout,
+ };
+
+ // Arm the timer
+ r = timerfd_settime(timerfd, 0, &timer, NULL);
+ if (r)
+ return -errno;
+
+ return timerfd;
}
-static int open_connection(struct fireperf_ctx* ctx) {
+static int open_connection(struct fireperf_ctx* ctx,
+ struct fireperf_client_state* state, int epollfd) {
+ struct fireperf_worker* worker = NULL;
+ int fd;
+ int r;
+
// Open a new socket
- int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
+ fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
if (fd < 0) {
- ERROR(ctx, "Could not open socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not open socket: %m\n");
+ r = -errno;
goto ERROR;
}
};
// Set socket buffer sizes
- int r = set_socket_buffer_sizes(ctx, fd);
+ r = set_socket_buffer_sizes(ctx, fd);
if (r)
goto ERROR;
// Connect to the server
r = connect(fd, &peer, sizeof(peer));
- if (r && (errno != EINPROGRESS)) {
- ERROR(ctx, "Could not connect to server: %s\n", strerror(errno));
+ if (r) {
+ switch (errno) {
+ case EINPROGRESS:
+ break;
+
+ default:
+ ERROR(ctx, "Could not connect to server: %m\n");
+ r = -errno;
+ goto ERROR;
+ }
+ }
+
+ // Find a worker to delegate this connection to
+ worker = fireperf_ctx_fetch_worker(ctx);
+
+ // Close the connection if we could not find a worker
+ if (!worker) {
+ ERROR(ctx, "Could not find a worker that could handle a new connection\n");
+ r = -EBUSY;
goto ERROR;
}
- return fd;
+ int events = EPOLLIN;
+
+ // Let us know when the socket is ready for receiving data
+ if (!ctx->keepalive_only)
+ events |= EPOLLIN;
+
+ // In duplex mode, we send data, too
+ if (ctx->duplex)
+ events |= EPOLLOUT;
+
+ // Delegate the connection
+ r = fireperf_worker_delegate(worker, events, fd);
+ if (r < 0) {
+ ERROR(ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // Increment the number of open connections
+ state->connections++;
+
+ return 0;
ERROR:
- if (fd > 0)
+ if (fd >= 0)
close(fd);
- return -1;
+ return r;
}
-int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats,
- int epollfd, int timerfd) {
- DEBUG(ctx, "Launching " PACKAGE_NAME " in client mode\n");
-
- int r = 1;
+static int open_connections(struct fireperf_ctx* ctx,
+ struct fireperf_client_state* state, int epollfd) {
+ int r = 0;
- struct epoll_event ev = {
- .events = EPOLLIN,
- };
- struct epoll_event events[EPOLL_MAX_EVENTS];
+ // If we don't have enough open connections, we try to open more
+ while (state->connections < ctx->parallel) {
+ r = open_connection(ctx, state, epollfd);
+ if (r)
+ break;
+ }
- // Let us know when the socket is ready for receiving data
- if (!ctx->keepalive_only)
- ev.events |= EPOLLIN;
+ return r;
+}
- // In duplex mode, we send data, too
- if (ctx->duplex)
- ev.events |= EPOLLOUT;
+int fireperf_client_init(struct fireperf_ctx* ctx, void** data, int epollfd) {
+ struct fireperf_client_state* state = NULL;
+ int r;
- DEBUG(ctx, "Opening %lu connections...\n", ctx->parallel);
+ // Allocate state
+ state = calloc(1, sizeof(*state));
+ if (!state)
+ return -errno;
- // ctxigure timeout if set
+ // Setup a timer for the timeout
if (ctx->timeout) {
- // Register signal handler
- signal(SIGALRM, handle_SIGALRM);
-
- alarm(ctx->timeout);
+ state->timerfd = setup_timer(ctx, epollfd);
+ if (state->timerfd < 0)
+ return -state->timerfd;
+ } else {
+ state->timerfd = -1;
}
- DEBUG(ctx, "Entering main loop...\n");
+ // Open some initial connections
+ r = open_connections(ctx, state, epollfd);
+ if (r)
+ goto ERROR;
- while (!ctx->terminated && !timeout_expired) {
- // Open connections
- while (stats->open_connections < ctx->parallel) {
- int fd = open_connection(ctx);
- if (fd < 0)
- continue;
+ // Return the state
+ *data = state;
- ev.data.fd = fd;
+ return 0;
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) {
- ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n",
- strerror(errno));
- goto ERROR;
- }
+ERROR:
+ if (state)
+ fireperf_client_free(ctx, state);
- stats->open_connections++;
- stats->connections++;
- }
+ return r;
+}
- int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
- if (fds < 1) {
- // We terminate gracefully when we receive a signal
- if (errno == EINTR)
- break;
+int fireperf_client_handle(struct fireperf_ctx* ctx, void* data, int fd) {
+ struct fireperf_client_state* state = data;
- ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno));
- goto ERROR;
- }
+ // Check if the timeout has fired
+ if (state->timerfd == fd) {
+ DEBUG(ctx, "Timeout reached\n");
- for (int i = 0; i < fds; i++) {
- int fd = events[i].data.fd;
-
- // What type of event are we handling?
-
- // Handle timer events
- if (fd == timerfd) {
- uint64_t expirations;
-
- // Read from the timer to disarm it
- ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
- if (bytes_read <= 0) {
- ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
- goto ERROR;
- }
-
- r = fireperf_dump_stats(ctx, stats, FIREPERF_MODE_CLIENT);
- if (r)
- goto ERROR;
-
- // Handle connection sockets
- } else {
- // Has the socket been disconnected?
- if (events[i].events & EPOLLHUP) {
- if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) {
- ERROR(ctx, "Could not remove socket file descriptor from epoll(): %s\n",
- strerror(errno));
- goto ERROR;
- }
-
- close(fd);
-
- stats->open_connections--;
- } else {
- // Close connections immediately when -x is set
- if (ctx->close) {
- DEBUG(ctx, "Closing connection %d\n", fd);
- close(fd);
-
- stats->open_connections--;
- continue;
- }
-
- // Handle incoming data
- if (events[i].events & EPOLLIN) {
- r = handle_connection_recv(ctx, stats, fd);
- if (r < 0)
- goto ERROR;
- }
-
- // Handle outgoing data
- if (events[i].events & EPOLLOUT) {
- r = handle_connection_send(ctx, stats, fd);
- if (r)
- goto ERROR;
- }
- }
- }
- }
+ return -ETIME;
}
- // All okay
- r = 0;
+ return 0;
+}
-ERROR:
- return r;
+void fireperf_client_free(struct fireperf_ctx* ctx, void* data) {
+ struct fireperf_client_state* state = NULL;
+
+ if (state->timerfd >= 0)
+ close(state->timerfd);
+
+ free(state);
}
#define FIREPERF_CLIENT_H
#include "ctx.h"
-#include "main.h"
-int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats,
- int epollfd, int timerfd);
+int fireperf_client_init(struct fireperf_ctx* ctx, void** data, int epollfd);
+int fireperf_client_handle(struct fireperf_ctx* ctx, void* data, int fd);
+void fireperf_client_free(struct fireperf_ctx* ctx, void* data);
#endif /* FIREPERF_CLIENT_H */
int main(int argc, char* argv[]) {
struct fireperf_ctx* ctx = NULL;
struct fireperf_stats stats = { 0 };
- int r;
+ uint64_t expirations = 0;
+ void* data = NULL;
int epollfd = -1;
int timerfd = -1;
+ int ready;
+ int r;
// Create a new context
r = fireperf_ctx_create(&ctx, argc, argv);
if (r)
return r;
+ switch (ctx->mode) {
+ case FIREPERF_MODE_CLIENT:
+ case FIREPERF_MODE_SERVER:
+ break;
+
+ case FIREPERF_MODE_NONE:
+ fprintf(stderr, "No mode selected\n");
+ r = 2;
+ goto ERROR;
+ }
+
// Set limits
r = set_limits(ctx);
if (r)
goto ERROR;
}
+ // Initialize the client/server
switch (ctx->mode) {
case FIREPERF_MODE_CLIENT:
- return fireperf_client(ctx, &stats, epollfd, timerfd);
+ r = fireperf_client_init(ctx, &data, epollfd);
+ if (r)
+ goto ERROR;
+ break;
case FIREPERF_MODE_SERVER:
- return fireperf_server(ctx, epollfd, timerfd);
-
- case FIREPERF_MODE_NONE:
- fprintf(stderr, "No mode selected\n");
- r = 2;
+ r = fireperf_server_init(ctx, &data, epollfd);
+ if (r)
+ goto ERROR;
break;
+
+ default:
+ abort();
+ }
+
+ struct epoll_event events[EPOLL_MAX_EVENTS];
+
+ // Main loop
+ for (;;) {
+ ready = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
+ if (ready < 1) {
+ // We terminate gracefully when we receive a signal
+ if (errno == EINTR)
+ break;
+
+ ERROR(ctx, "epoll_wait() failed: %m\n");
+ goto ERROR;
+ }
+
+ // Iterate over all file descriptors that have activity
+ for (int i = 0; i < ready; i++) {
+ int fd = events[i].data.fd;
+
+ // Handle timer events
+ if (fd == timerfd) {
+ // Read from the timer to disarm it
+ ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
+ if (bytes_read <= 0) {
+ ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
+ goto ERROR;
+ }
+
+ // Cumulate stats
+ stats = fireperf_ctx_get_stats(ctx);
+
+ // Print the stats
+ r = fireperf_dump_stats(ctx, &stats, ctx->mode);
+ if (r)
+ goto ERROR;
+
+ // Handle everything else
+ } else {
+ switch (ctx->mode) {
+ case FIREPERF_MODE_CLIENT:
+ r = fireperf_client_handle(ctx, data, fd);
+ if (r)
+ goto ERROR;
+ break;
+
+ case FIREPERF_MODE_SERVER:
+ r = fireperf_server_handle(ctx, data, fd);
+ if (r)
+ goto ERROR;
+ break;
+
+ default:
+ abort();
+ }
+ }
+ }
}
ERROR:
+ switch (ctx->mode) {
+ case FIREPERF_MODE_CLIENT:
+ fireperf_client_free(ctx, data);
+ break;
+
+ case FIREPERF_MODE_SERVER:
+ fireperf_server_free(ctx, data);
+ break;
+
+ default:
+ abort();
+ }
+
if (epollfd > 0)
close(epollfd);
if (timerfd > 0)
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
-#include <time.h>
#include <unistd.h>
#include "constants.h"
#include "logging.h"
#include "main.h"
#include "server.h"
-#include "util.h"
#include "worker.h"
-struct fireperf_server {
- // ctxiguration
- struct fireperf_ctx* ctx;
-
- // Count all connections
- unsigned int connections;
-};
-
static int enable_keepalive(struct fireperf_ctx* ctx, int fd) {
// Enable keepalive
int flags = 1;
return 0;
}
-static int create_socket(struct fireperf_ctx* ctx, int i) {
+static int create_socket(struct fireperf_ctx* ctx, int epollfd, unsigned int i) {
+ int flags = 1;
+ int fd;
int r;
// Open a new socket
- int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
+ fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
if (fd < 0) {
ERROR(ctx, "Could not open socket: %s\n", strerror(errno));
goto ERROR;
}
- int flags = 1;
+ // Enable re-use port
r = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &flags, sizeof(flags));
if (r) {
ERROR(ctx, "Could not set SO_REUSEPORT on socket %d: %s\n",
// Bind it to the selected port
r = bind(fd, &addr, sizeof(addr));
if (r) {
- ERROR(ctx, "Could not bind socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not bind socket: %m\n");
+ r = -errno;
goto ERROR;
}
// Listen
r = listen(fd, SOCKET_BACKLOG);
if (r) {
- ERROR(ctx, "Could not listen on socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not listen on socket: %m\n");
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Add listening socket to epoll
+ struct epoll_event ev = {
+ .events = EPOLLIN,
+ .data.fd = fd,
+ };
+
+ // Add the socket to the event loop
+ r = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
+ if (r) {
+ ERROR(ctx, "Could not add socket file descriptor to epoll(): %m\n");
+ r = -errno;
goto ERROR;
}
ERROR:
close(fd);
- return -1;
+ return r;
}
-static int accept_connection(struct fireperf_server* server, int sockfd) {
- struct fireperf_worker* worker = NULL;
- struct sockaddr_in6 addr = {};
+int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd) {
int r;
- socklen_t l = sizeof(addr);
+ // Create listening sockets
+ for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
+ r = create_socket(ctx, epollfd, i);
+ if (r < 0)
+ return r;
+ }
+ return 0;
+}
+
+int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int sockfd) {
+ struct fireperf_worker* worker = NULL;
+ struct sockaddr_in6 addr = {};
int fd = -1;
+ int r = 0;
+
+ socklen_t l = sizeof(addr);
// The listening socket is ready, there is a new connection waiting to be accepted
do {
fd = accept(sockfd, &addr, &l);
} while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+ // Fail if we could not accept the connection
if (fd < 0) {
- ERROR(server->ctx, "Could not accept a new connection: %s\n", strerror(errno));
- return -1;
+ ERROR(ctx, "Could not accept a new connection: %s\n", strerror(errno));
+ return -errno;
}
- DEBUG(server->ctx, "New connection accepted on socket %d\n", fd);
-
- // A connection has been opened
- server->connections++;
+ DEBUG(ctx, "New connection accepted on socket %d\n", fd);
// Close connections immediately when -x is set
- if (server->ctx->close) {
- r = 0;
-
+ if (ctx->close)
goto ERROR;
- }
// Find a worker to delegate this connection to
- worker = fireperf_ctx_fetch_worker(server->ctx);
+ worker = fireperf_ctx_fetch_worker(ctx);
// Close the connection if we could not find a worker
if (!worker) {
- ERROR(server->ctx, "Could not find a worker that could handle a new connection\n");
+ ERROR(ctx, "Could not find a worker that could handle a new connection\n");
r = -EBUSY;
goto ERROR;
}
+ // Read any data
+ int events = EPOLLIN;
+
+ // Send data unless we are in keepalive-only mode
+ if (!ctx->keepalive_only)
+ events |= EPOLLOUT;
+
// Delegate the connection
- r = fireperf_worker_delegate(worker, fd);
+ r = fireperf_worker_delegate(worker, events, fd);
if (r < 0) {
- ERROR(server->ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
+ ERROR(ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
goto ERROR;
}
return 0;
ERROR:
- if (fd >= 0) {
- DEBUG(server->ctx, "Closing connection %d\n", fd);
-
- close(fd);
- }
+ DEBUG(ctx, "Closing connection %d\n", fd);
+ close(fd);
return r;
}
-static int is_listening_socket(struct fireperf_ctx* ctx, int* sockets, int fd) {
- for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
- if (sockets[i] == fd)
- return 1;
- }
-
- return 0;
-}
-
-int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd) {
- struct fireperf_server server = {
- .ctx = ctx,
- };
- struct fireperf_stats stats = {};
- uint64_t expirations = 0;
- int r;
-
- DEBUG(ctx, "Launching " PACKAGE_NAME " in server mode\n");
-
- int listening_sockets[ctx->listening_sockets];
-
- struct epoll_event ev;
- struct epoll_event events[EPOLL_MAX_EVENTS];
-
- // Create listening sockets
- for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
- int sockfd = create_socket(ctx, i);
- if (sockfd < 0)
- return 1;
-
- listening_sockets[i] = sockfd;
-
- // Add listening socket to epoll
- ev.events = EPOLLIN;
- ev.data.fd = sockfd;
-
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev)) {
- ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n",
- strerror(errno));
- goto ERROR;
- }
- }
-
- DEBUG(ctx, "Entering main loop...\n");
-
- while (!ctx->terminated) {
- int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
- if (fds < 1) {
- // We terminate gracefully when we receive a signal
- if (errno == EINTR)
- break;
-
- ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno));
- goto ERROR;
- }
-
- for (int i = 0; i < fds; i++) {
- int fd = events[i].data.fd;
-
- // The listening socket
- if (is_listening_socket(ctx, listening_sockets, fd)) {
- r = accept_connection(&server, fd);
- if (r < 0)
- goto ERROR;
-
- // Handle timer events
- } else if (fd == timerfd) {
- // Read from the timer to disarm it
- ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
- if (bytes_read <= 0) {
- ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
- goto ERROR;
- }
-
- // Cumulate stats
- stats = fireperf_ctx_get_stats(ctx);
-
- // Print the stats
- r = fireperf_dump_stats(ctx, &stats, FIREPERF_MODE_SERVER);
- if (r)
- goto ERROR;
- }
- }
- }
-
-ERROR:
- for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
- if (listening_sockets[i] > 0)
- close(listening_sockets[i]);
- }
-
- return r;
+void fireperf_server_free(struct fireperf_ctx*, void* data) {
+ return;
}
#include "ctx.h"
-int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd);
+int fireperf_server_init(struct fireperf_ctx* ctx, void** data, int epollfd);
+int fireperf_server_handle(struct fireperf_ctx* ctx, void* data, int fd);
+void fireperf_server_free(struct fireperf_ctx* ctx, void* data);
#endif /* FIREPERF_SERVER_H */
return pthread_tryjoin_np(worker->thread, NULL);
}
-int fireperf_worker_delegate(struct fireperf_worker* worker, int socket) {
+int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket) {
int r;
+ // Always listen to the connection closing
+ events |= EPOLLRDHUP;
+
struct epoll_event ev = {
- .events = EPOLLIN|EPOLLRDHUP,
+ .events = events,
.data.fd = socket,
};
- if (!worker->ctx->keepalive_only)
- ev.events |= EPOLLOUT;
-
// Add the socket to the loop
r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
if (r)
int fireperf_worker_terminate(struct fireperf_worker* worker);
int fireperf_worker_is_alive(struct fireperf_worker* worker);
-int fireperf_worker_delegate(struct fireperf_worker* worker, int socket);
+int fireperf_worker_delegate(struct fireperf_worker* worker, int events, int socket);
const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker);