}
}
-static int open_connection(struct fireperf_config* conf) {
+static int open_connection(struct fireperf_ctx* ctx) {
// Open a new socket
int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
if (fd < 0) {
- ERROR(conf, "Could not open socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not open socket: %s\n", strerror(errno));
goto ERROR;
}
// Chose a random port
- int port = conf->port + (random() % conf->listening_sockets);
+ int port = ctx->port + (random() % ctx->listening_sockets);
- DEBUG(conf, "Opening socket %d (port %d)...\n", fd, port);
+ DEBUG(ctx, "Opening socket %d (port %d)...\n", fd, port);
// Define the peer
struct sockaddr_in6 peer = {
.sin6_family = AF_INET6,
- .sin6_addr = conf->address,
+ .sin6_addr = ctx->address,
.sin6_port = htons(port),
};
// Set socket buffer sizes
- int r = set_socket_buffer_sizes(conf, fd);
+ int r = set_socket_buffer_sizes(ctx, fd);
if (r)
goto ERROR;
// Connect to the server
r = connect(fd, &peer, sizeof(peer));
if (r && (errno != EINPROGRESS)) {
- ERROR(conf, "Could not connect to server: %s\n", strerror(errno));
+ ERROR(ctx, "Could not connect to server: %s\n", strerror(errno));
goto ERROR;
}
return -1;
}
-int fireperf_client(struct fireperf_config* conf, struct fireperf_stats* stats,
+int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats,
int epollfd, int timerfd) {
- DEBUG(conf, "Launching " PACKAGE_NAME " in client mode\n");
+ DEBUG(ctx, "Launching " PACKAGE_NAME " in client mode\n");
int r = 1;
struct epoll_event events[EPOLL_MAX_EVENTS];
// Let us know when the socket is ready for receiving data
- if (!conf->keepalive_only)
+ if (!ctx->keepalive_only)
ev.events |= EPOLLIN;
// In duplex mode, we send data, too
- if (conf->duplex)
+ if (ctx->duplex)
ev.events |= EPOLLOUT;
- DEBUG(conf, "Opening %lu connections...\n", conf->parallel);
+ DEBUG(ctx, "Opening %lu connections...\n", ctx->parallel);
- // Configure timeout if set
- if (conf->timeout) {
+ // ctxigure timeout if set
+ if (ctx->timeout) {
// Register signal handler
signal(SIGALRM, handle_SIGALRM);
- alarm(conf->timeout);
+ alarm(ctx->timeout);
}
- DEBUG(conf, "Entering main loop...\n");
+ DEBUG(ctx, "Entering main loop...\n");
- while (!conf->terminated && !timeout_expired) {
+ while (!ctx->terminated && !timeout_expired) {
// Open connections
- while (stats->open_connections < conf->parallel) {
- int fd = open_connection(conf);
+ while (stats->open_connections < ctx->parallel) {
+ int fd = open_connection(ctx);
if (fd < 0)
continue;
ev.data.fd = fd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) {
- ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n",
+ ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n",
strerror(errno));
goto ERROR;
}
if (errno == EINTR)
break;
- ERROR(conf, "epoll_wait() failed: %s\n", strerror(errno));
+ ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno));
goto ERROR;
}
// Read from the timer to disarm it
ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
if (bytes_read <= 0) {
- ERROR(conf, "Could not read from timerfd: %s\n", strerror(errno));
+ ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
goto ERROR;
}
- r = fireperf_dump_stats(conf, stats, FIREPERF_MODE_CLIENT);
+ r = fireperf_dump_stats(ctx, stats, FIREPERF_MODE_CLIENT);
if (r)
goto ERROR;
// Has the socket been disconnected?
if (events[i].events & EPOLLHUP) {
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) {
- ERROR(conf, "Could not remove socket file descriptor from epoll(): %s\n",
+ ERROR(ctx, "Could not remove socket file descriptor from epoll(): %s\n",
strerror(errno));
goto ERROR;
}
stats->open_connections--;
} else {
// Close connections immediately when -x is set
- if (conf->close) {
- DEBUG(conf, "Closing connection %d\n", fd);
+ if (ctx->close) {
+ DEBUG(ctx, "Closing connection %d\n", fd);
close(fd);
stats->open_connections--;
// Handle incoming data
if (events[i].events & EPOLLIN) {
- r = handle_connection_recv(conf, stats, fd);
+ r = handle_connection_recv(ctx, stats, fd);
if (r < 0)
goto ERROR;
}
// Handle outgoing data
if (events[i].events & EPOLLOUT) {
- r = handle_connection_send(conf, stats, fd);
+ r = handle_connection_send(ctx, stats, fd);
if (r)
goto ERROR;
}
#include "main.h"
-int fireperf_client(struct fireperf_config* conf, struct fireperf_stats* stats,
+int fireperf_client(struct fireperf_ctx* ctx, struct fireperf_stats* stats,
int epollfd, int timerfd);
#endif /* FIREPERF_CLIENT_H */
#include "logging.h"
#include "main.h"
-int fireperf_get_log_level(struct fireperf_config* conf) {
- return conf->loglevel;
+int fireperf_get_log_level(struct fireperf_ctx* ctx) {
+ return ctx->loglevel;
}
-static void fireperf_log_console(struct fireperf_config* conf, int priority,
+static void fireperf_log_console(struct fireperf_ctx* ctx, int priority,
const char* file, int line, const char* fn, const char* format, va_list args) {
switch (priority) {
// Print error messages to stderr
}
}
-void fireperf_log(struct fireperf_config* conf, int priority,
+void fireperf_log(struct fireperf_ctx* ctx, int priority,
const char* file, int line, const char* fn, const char* format, ...) {
va_list args;
va_start(args, format);
- fireperf_log_console(conf, priority, file, line, fn, format, args);
+ fireperf_log_console(ctx, priority, file, line, fn, format, args);
va_end(args);
}
#include "main.h"
static inline void __attribute__((always_inline, format(printf, 2, 3)))
- fireperf_log_null(struct fireperf_config* conf, const char* format, ...) {}
+ fireperf_log_null(struct fireperf_ctx* ctx, const char* format, ...) {}
-#define fireperf_log_cond(conf, prio, arg...) \
+#define fireperf_log_cond(ctx, prio, arg...) \
do { \
- if (fireperf_get_log_level(conf) >= prio) \
- fireperf_log(conf, prio, __FILE__, __LINE__, __FUNCTION__, ## arg); \
+ if (fireperf_get_log_level(ctx) >= prio) \
+ fireperf_log(ctx, prio, __FILE__, __LINE__, __FUNCTION__, ## arg); \
} while (0)
#ifdef ENABLE_DEBUG
-# define DEBUG(conf, arg...) fireperf_log_cond(conf, LOG_DEBUG, ## arg)
+# define DEBUG(ctx, arg...) fireperf_log_cond(ctx, LOG_DEBUG, ## arg)
#else
-# define DEBUG(conf, arg...) fireperf_log_null(conf, ## arg)
+# define DEBUG(ctx, arg...) fireperf_log_null(ctx, ## arg)
#endif
-#define INFO(conf, arg...) fireperf_log_cond(conf, LOG_INFO, ## arg)
-#define ERROR(conf, arg...) fireperf_log_cond(conf, LOG_ERR, ## arg)
+#define INFO(ctx, arg...) fireperf_log_cond(ctx, LOG_INFO, ## arg)
+#define ERROR(ctx, arg...) fireperf_log_cond(ctx, LOG_ERR, ## arg)
-int fireperf_get_log_level(struct fireperf_config* conf);
+int fireperf_get_log_level(struct fireperf_ctx* ctx);
-void fireperf_log(struct fireperf_config* config,
+void fireperf_log(struct fireperf_ctx* ctx,
int priority, const char* file, int line, const char* fn,
const char* format, ...) __attribute__((format(printf, 6, 7)));
return 0;
}
-static int parse_port_range(struct fireperf_config* conf, const char* optarg) {
+static int parse_port_range(struct fireperf_ctx* ctx, const char* optarg) {
int first_port, last_port;
int r = sscanf(optarg, "%d:%d", &first_port, &last_port);
return 2;
}
- conf->port = first_port;
- conf->listening_sockets = (last_port - first_port) + 1;
+ ctx->port = first_port;
+ ctx->listening_sockets = (last_port - first_port) + 1;
return 0;
}
-static int parse_port(struct fireperf_config* conf, const char* optarg) {
- conf->port = atoi(optarg);
- conf->listening_sockets = 1;
+static int parse_port(struct fireperf_ctx* ctx, const char* optarg) {
+ ctx->port = atoi(optarg);
+ ctx->listening_sockets = 1;
- return check_port(conf->port);
+ return check_port(ctx->port);
}
-static int set_limits(struct fireperf_config* conf) {
+static int set_limits(struct fireperf_ctx* ctx) {
struct rlimit limit;
// Fetch current limit
if (getrlimit(RLIMIT_NOFILE, &limit) < 0) {
- ERROR(conf, "Could not read RLIMIT_NOFILE: %m\n");
+ ERROR(ctx, "Could not read RLIMIT_NOFILE: %m\n");
return 1;
}
// Apply the new limit
int r = setrlimit(RLIMIT_NOFILE, &limit);
if (r) {
- ERROR(conf, "Could not set open file limit to %lu: %m\n",
+ ERROR(ctx, "Could not set open file limit to %lu: %m\n",
(unsigned long)limit.rlim_cur);
return 1;
}
- DEBUG(conf, "RLIMIT_NOFILE set to %lu\n", (unsigned long)limit.rlim_cur);
+ DEBUG(ctx, "RLIMIT_NOFILE set to %lu\n", (unsigned long)limit.rlim_cur);
return 0;
}
-static int parse_argv(int argc, char* argv[], struct fireperf_config* conf) {
+static int parse_argv(int argc, char* argv[], struct fireperf_ctx* ctx) {
static struct option long_options[] = {
{"client", required_argument, 0, 'c'},
{"close", no_argument, 0, 'x'},
break;
case 'c':
- conf->mode = FIREPERF_MODE_CLIENT;
+ ctx->mode = FIREPERF_MODE_CLIENT;
// Parse the given IP address
- int r = parse_address(optarg, &conf->address);
+ int r = parse_address(optarg, &ctx->address);
if (r) {
fprintf(stderr, "Could not parse IP address %s\n", optarg);
return 2;
break;
case 'D':
- conf->duplex = 1;
+ ctx->duplex = 1;
break;
case 'd':
- conf->loglevel = LOG_DEBUG;
+ ctx->loglevel = LOG_DEBUG;
break;
case 'k':
- conf->keepalive_only = 1;
+ ctx->keepalive_only = 1;
break;
case 'P':
- conf->parallel = strtoul(optarg, NULL, 10);
+ ctx->parallel = strtoul(optarg, NULL, 10);
- if (conf->parallel > MAX_PARALLEL) {
+ if (ctx->parallel > MAX_PARALLEL) {
fprintf(stderr, "Number of parallel connections is too high: %lu\n",
- conf->parallel);
+ ctx->parallel);
return 2;
}
break;
case 'p':
// Try parsing the port range first.
// If this fails, we try parsing a single port
- r = parse_port_range(conf, optarg);
+ r = parse_port_range(ctx, optarg);
if (r == 1)
- r = parse_port(conf, optarg);
+ r = parse_port(ctx, optarg);
if (r)
return r;
break;
case 's':
- conf->mode = FIREPERF_MODE_SERVER;
+ ctx->mode = FIREPERF_MODE_SERVER;
break;
case 't':
- conf->timeout = strtoul(optarg, NULL, 10);
+ ctx->timeout = strtoul(optarg, NULL, 10);
break;
case 'x':
- conf->close = 1;
+ ctx->close = 1;
break;
case 'z':
- conf->zero = 1;
+ ctx->zero = 1;
break;
default:
}
int main(int argc, char* argv[]) {
- struct fireperf_config conf = {
+ struct fireperf_ctx ctx = {
.keepalive_count = DEFAULT_KEEPALIVE_COUNT,
.keepalive_interval = DEFAULT_KEEPALIVE_INTERVAL,
.listening_sockets = DEFAULT_LISTENING_SOCKETS,
int timerfd = -1;
// Parse command line
- r = parse_argv(argc, argv, &conf);
+ r = parse_argv(argc, argv, &ctx);
if (r)
return r;
srandom(time(NULL));
// Set limits
- r = set_limits(&conf);
+ r = set_limits(&ctx);
if (r)
return r;
// Initialize random pool
- if (!conf.zero) {
- conf.pool = fireperf_random_pool_create(&conf, DEFAULT_RANDOM_POOL_SIZE);
- if (!conf.pool) {
- ERROR(&conf, "Could not allocate random data\n");
+ if (!ctx.zero) {
+ ctx.pool = fireperf_random_pool_create(&ctx, DEFAULT_RANDOM_POOL_SIZE);
+ if (!ctx.pool) {
+ ERROR(&ctx, "Could not allocate random data\n");
r = 1;
goto ERROR;
}
// Initialize epoll()
epollfd = epoll_create1(0);
if (epollfd < 0) {
- ERROR(&conf, "Could not initialize epoll(): %s\n", strerror(errno));
+ ERROR(&ctx, "Could not initialize epoll(): %s\n", strerror(errno));
r = 1;
goto ERROR;
}
// Create timerfd() to print statistics
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
if (timerfd < 0) {
- ERROR(&conf, "timerfd_create() failed: %s\n", strerror(errno));
+ ERROR(&ctx, "timerfd_create() failed: %s\n", strerror(errno));
r = 1;
goto ERROR;
}
};
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, timerfd, &ev)) {
- ERROR(&conf, "Could not add timerfd to epoll(): %s\n", strerror(errno));
+ ERROR(&ctx, "Could not add timerfd to epoll(): %s\n", strerror(errno));
r = 1;
goto ERROR;
}
r = timerfd_settime(timerfd, 0, &timer, NULL);
if (r) {
- ERROR(&conf, "Could not set timer: %s\n", strerror(errno));
+ ERROR(&ctx, "Could not set timer: %s\n", strerror(errno));
r = 1;
goto ERROR;
}
- switch (conf.mode) {
+ switch (ctx.mode) {
case FIREPERF_MODE_CLIENT:
- return fireperf_client(&conf, &stats, epollfd, timerfd);
+ return fireperf_client(&ctx, &stats, epollfd, timerfd);
case FIREPERF_MODE_SERVER:
- return fireperf_server(&conf, epollfd, timerfd);
+ return fireperf_server(&ctx, epollfd, timerfd);
case FIREPERF_MODE_NONE:
fprintf(stderr, "No mode selected\n");
if (timerfd > 0)
close(timerfd);
- if (conf.pool)
- fireperf_random_pool_free(conf.pool);
+ if (ctx.pool)
+ fireperf_random_pool_free(ctx.pool);
return r;
}
-int fireperf_dump_stats(struct fireperf_config* conf, struct fireperf_stats* stats, int mode) {
+int fireperf_dump_stats(struct fireperf_ctx* ctx, struct fireperf_stats* stats, int mode) {
struct timespec now;
// Fetch the time
int r = clock_gettime(CLOCK_REALTIME, &now);
if (r) {
- ERROR(conf, "Could not fetch the time: %s\n", strerror(errno));
+ ERROR(ctx, "Could not fetch the time: %s\n", strerror(errno));
return 1;
}
// Format timestamp
const char* timestamp = format_timespec(&now);
- INFO(conf, "--- %s -------------------------\n", timestamp);
- INFO(conf, " : %12s %12s\n", "RX", "TX");
- INFO(conf, " %-20s: %25u\n", "Open Connection(s)", stats->open_connections);
- INFO(conf, " %-20s: %23.2f/s\n", "New Connections", stats->connections / delta);
+ INFO(ctx, "--- %s -------------------------\n", timestamp);
+ INFO(ctx, " : %12s %12s\n", "RX", "TX");
+ INFO(ctx, " %-20s: %25u\n", "Open Connection(s)", stats->open_connections);
+ INFO(ctx, " %-20s: %23.2f/s\n", "New Connections", stats->connections / delta);
// Show current bandwidth
char* bps_received = format_size(stats->bytes_received * 8 / delta, FIREPERF_FORMAT_BITS);
char* bps_sent = format_size(stats->bytes_sent * 8 / delta, FIREPERF_FORMAT_BITS);
if (bps_received || bps_sent) {
- INFO(conf, " %-20s: %10s/s %10s/s\n", "Current Bandwidth", bps_received, bps_sent);
+ INFO(ctx, " %-20s: %10s/s %10s/s\n", "Current Bandwidth", bps_received, bps_sent);
if (bps_received)
free(bps_received);
char* total_bytes_sent = format_size(stats->total_bytes_sent, FIREPERF_FORMAT_BYTES);
if (total_bytes_received || total_bytes_sent) {
- INFO(conf, " %-20s: %12s %12s\n", "Total Bytes", total_bytes_received, total_bytes_sent);
+ INFO(ctx, " %-20s: %12s %12s\n", "Total Bytes", total_bytes_received, total_bytes_sent);
if (total_bytes_received)
free(total_bytes_received);
}
// Empty line
- INFO(conf, "\n");
+ INFO(ctx, "\n");
// Remember when this was printed last
stats->last_printed = now;
return 0;
}
-int set_socket_buffer_sizes(struct fireperf_config* conf, int fd) {
+int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd) {
int r;
// Set socket buffer sizes
int flags = SOCKET_SEND_BUFFER_SIZE;
r = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void*)&flags, sizeof(flags));
if (r) {
- ERROR(conf, "Could not set send buffer size on socket %d: %s\n",
+ ERROR(ctx, "Could not set send buffer size on socket %d: %s\n",
fd, strerror(errno));
return 1;
}
flags = SOCKET_RECV_BUFFER_SIZE;
r = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void*)&flags, sizeof(flags));
if (r) {
- ERROR(conf, "Could not set receive buffer size on socket %d: %s\n",
+ ERROR(ctx, "Could not set receive buffer size on socket %d: %s\n",
fd, strerror(errno));
return 1;
}
static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
-int handle_connection_send(struct fireperf_config* conf,
+int handle_connection_send(struct fireperf_ctx* ctx,
struct fireperf_stats* stats, int fd) {
const char* buffer = ZERO;
ssize_t bytes_sent;
- if (conf->pool) {
- buffer = fireperf_random_pool_get_slice(conf->pool, SOCKET_SEND_BUFFER_SIZE);
+ if (ctx->pool) {
+ buffer = fireperf_random_pool_get_slice(ctx->pool, SOCKET_SEND_BUFFER_SIZE);
}
do {
return 0;
}
-int handle_connection_recv(struct fireperf_config* conf,
+int handle_connection_recv(struct fireperf_ctx* ctx,
struct fireperf_stats* stats, int fd) {
char buffer[SOCKET_RECV_BUFFER_SIZE];
ssize_t bytes_read;
// Error?
if (bytes_read < 0) {
- ERROR(conf, "Could not read from socket %d: %s\n", fd, strerror(errno));
+ ERROR(ctx, "Could not read from socket %d: %s\n", fd, strerror(errno));
return -1;
}
- DEBUG(conf, "Read %zu bytes from socket %d\n", bytes_read, fd);
+ DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
// Update statistics
stats->bytes_received += bytes_read;
// Forward declaration
struct fireperf_random_pool;
-struct fireperf_config {
+struct fireperf_ctx {
int terminated;
int loglevel;
enum {
size_t total_bytes_sent;
};
-int fireperf_dump_stats(struct fireperf_config* conf, struct fireperf_stats* stats, int mode);
+int fireperf_dump_stats(struct fireperf_ctx* ctx, struct fireperf_stats* stats, int mode);
-int set_socket_buffer_sizes(struct fireperf_config* conf, int fd);
+int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd);
-int handle_connection_send(struct fireperf_config* conf,
+int handle_connection_send(struct fireperf_ctx* ctx,
struct fireperf_stats* stats, int fd);
-int handle_connection_recv(struct fireperf_config* conf,
+int handle_connection_recv(struct fireperf_ctx* ctx,
struct fireperf_stats* stats, int fd);
#endif /* FIREPERF_MAIN_H */
#include "main.h"
#include "random.h"
-struct fireperf_random_pool* fireperf_random_pool_create(struct fireperf_config* conf, size_t size) {
+struct fireperf_random_pool* fireperf_random_pool_create(struct fireperf_ctx* ctx, size_t size) {
struct fireperf_random_pool* pool = calloc(1, sizeof(*pool));
if (!pool)
return NULL;
offset += getrandom(pool->data + offset, pool->size - offset, 0);
}
- DEBUG(conf, "Allocated random pool of %zu bytes(s)\n", pool->size);
+ DEBUG(ctx, "Allocated random pool of %zu bytes(s)\n", pool->size);
return pool;
};
struct fireperf_random_pool* fireperf_random_pool_create(
- struct fireperf_config* conf, size_t size);
+ struct fireperf_ctx* ctx, size_t size);
void fireperf_random_pool_free(struct fireperf_random_pool* pool);
const char* fireperf_random_pool_get_slice(struct fireperf_random_pool* pool, size_t size);
#define SOCKET_BACKLOG 1024
struct fireperf_server {
- // Configuration
- struct fireperf_config* conf;
+ // ctxiguration
+ struct fireperf_ctx* ctx;
// Count all connections
unsigned int connections;
return stats;
}
-static int enable_keepalive(struct fireperf_config* conf, int fd) {
+static int enable_keepalive(struct fireperf_ctx* ctx, int fd) {
// Enable keepalive
int flags = 1;
int r = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&flags, sizeof(flags));
if (r) {
- ERROR(conf, "Could not set SO_KEEPALIVE on socket %d: %s\n",
+ ERROR(ctx, "Could not set SO_KEEPALIVE on socket %d: %s\n",
fd, strerror(errno));
return 1;
}
// Set keepalive interval
- if (conf->keepalive_interval) {
- DEBUG(conf, "Setting keepalive interval to %d\n", conf->keepalive_interval);
+ if (ctx->keepalive_interval) {
+ DEBUG(ctx, "Setting keepalive interval to %d\n", ctx->keepalive_interval);
r = setsockopt(fd, SOL_TCP, TCP_KEEPINTVL,
- (void*)&conf->keepalive_interval, sizeof(conf->keepalive_interval));
+ (void*)&ctx->keepalive_interval, sizeof(ctx->keepalive_interval));
if (r) {
- ERROR(conf, "Could not set TCP_KEEPINTVL on socket %d: %s\n",
+ ERROR(ctx, "Could not set TCP_KEEPINTVL on socket %d: %s\n",
fd, strerror(errno));
return 1;
}
- DEBUG(conf, "Setting keepalive idle interval to %d\n", conf->keepalive_interval);
+ DEBUG(ctx, "Setting keepalive idle interval to %d\n", ctx->keepalive_interval);
flags = 1;
r = setsockopt(fd, SOL_TCP, TCP_KEEPIDLE,
(void*)&flags, sizeof(flags));
if (r) {
- ERROR(conf, "Could not set TCP_KEEPIDLE on socket %d: %s\n",
+ ERROR(ctx, "Could not set TCP_KEEPIDLE on socket %d: %s\n",
fd, strerror(errno));
return 1;
}
}
// Set keepalive count
- if (conf->keepalive_count) {
- DEBUG(conf, "Setting keepalive count to %d\n", conf->keepalive_count);
+ if (ctx->keepalive_count) {
+ DEBUG(ctx, "Setting keepalive count to %d\n", ctx->keepalive_count);
r = setsockopt(fd, SOL_TCP, TCP_KEEPCNT,
- (void*)&conf->keepalive_count, sizeof(conf->keepalive_count));
+ (void*)&ctx->keepalive_count, sizeof(ctx->keepalive_count));
if (r) {
- ERROR(conf, "Could not set TCP_KEEPCNT on socket %d: %s\n",
+ ERROR(ctx, "Could not set TCP_KEEPCNT on socket %d: %s\n",
fd, strerror(errno));
return 1;
}
return 0;
}
-static int create_socket(struct fireperf_config* conf, int i) {
+static int create_socket(struct fireperf_ctx* ctx, int i) {
int r;
// Open a new socket
int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
if (fd < 0) {
- ERROR(conf, "Could not open socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not open socket: %s\n", strerror(errno));
goto ERROR;
}
int flags = 1;
r = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &flags, sizeof(flags));
if (r) {
- ERROR(conf, "Could not set SO_REUSEPORT on socket %d: %s\n",
+ ERROR(ctx, "Could not set SO_REUSEPORT on socket %d: %s\n",
fd, strerror(errno));
goto ERROR;
}
// Enable zero-copy
r = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &flags, sizeof(flags));
if (r) {
- ERROR(conf, "Could not set SO_ZEROCOPY on socket %d: %s\n",
+ ERROR(ctx, "Could not set SO_ZEROCOPY on socket %d: %s\n",
fd, strerror(errno));
goto ERROR;
}
// Set socket buffer sizes
- r = set_socket_buffer_sizes(conf, fd);
+ r = set_socket_buffer_sizes(ctx, fd);
if (r)
goto ERROR;
// Enable keepalive
- if (conf->keepalive_only) {
- r = enable_keepalive(conf, fd);
+ if (ctx->keepalive_only) {
+ r = enable_keepalive(ctx, fd);
if (r)
goto ERROR;
}
struct sockaddr_in6 addr = {
.sin6_family = AF_INET6,
- .sin6_port = htons(conf->port + i),
+ .sin6_port = htons(ctx->port + i),
};
// Bind it to the selected port
r = bind(fd, &addr, sizeof(addr));
if (r) {
- ERROR(conf, "Could not bind socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not bind socket: %s\n", strerror(errno));
goto ERROR;
}
// Listen
r = listen(fd, SOCKET_BACKLOG);
if (r) {
- ERROR(conf, "Could not listen on socket: %s\n", strerror(errno));
+ ERROR(ctx, "Could not listen on socket: %s\n", strerror(errno));
goto ERROR;
}
- DEBUG(conf, "Created listening socket %d\n", fd);
+ DEBUG(ctx, "Created listening socket %d\n", fd);
return fd;
*/
// Create a new worker
- r = fireperf_worker_create(&worker, server->conf);
+ r = fireperf_worker_create(&worker, server->ctx);
if (r < 0) {
- ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+ ERROR(server->ctx, "Could not create worker: %s\n", strerror(-r));
goto ERROR;
}
// Launch the worker
r = fireperf_worker_launch(worker);
if (r < 0) {
- ERROR(server->conf, "Could not launch worker: %s\n", strerror(-r));
+ ERROR(server->ctx, "Could not launch worker: %s\n", strerror(-r));
goto ERROR;
}
} while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
if (fd < 0) {
- ERROR(server->conf, "Could not accept a new connection: %s\n", strerror(errno));
+ ERROR(server->ctx, "Could not accept a new connection: %s\n", strerror(errno));
return -1;
}
- DEBUG(server->conf, "New connection accepted on socket %d\n", fd);
+ DEBUG(server->ctx, "New connection accepted on socket %d\n", fd);
// A connection has been opened
server->connections++;
// Close connections immediately when -x is set
- if (server->conf->close) {
+ if (server->ctx->close) {
r = 0;
goto ERROR;
// Close the connection if we could not find a worker
if (!worker) {
- ERROR(server->conf, "Could not find a worker that could handle a new connection\n");
+ ERROR(server->ctx, "Could not find a worker that could handle a new connection\n");
r = -EBUSY;
goto ERROR;
}
// Delegate the connection
r = fireperf_worker_delegate(worker, fd);
if (r < 0) {
- ERROR(server->conf, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
+ ERROR(server->ctx, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
goto ERROR;
}
ERROR:
if (fd >= 0) {
- DEBUG(server->conf, "Closing connection %d\n", fd);
+ DEBUG(server->ctx, "Closing connection %d\n", fd);
close(fd);
}
return r;
}
-static int is_listening_socket(struct fireperf_config* conf, int* sockets, int fd) {
- for (unsigned int i = 0; i < conf->listening_sockets; i++) {
+static int is_listening_socket(struct fireperf_ctx* ctx, int* sockets, int fd) {
+ for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
if (sockets[i] == fd)
return 1;
}
return 0;
}
-int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd) {
+int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd) {
struct fireperf_server server = {
- .conf = conf,
+ .ctx = ctx,
};
struct fireperf_stats stats = {};
uint64_t expirations = 0;
int r;
- DEBUG(conf, "Launching " PACKAGE_NAME " in server mode\n");
+ DEBUG(ctx, "Launching " PACKAGE_NAME " in server mode\n");
- int listening_sockets[conf->listening_sockets];
+ int listening_sockets[ctx->listening_sockets];
struct epoll_event ev;
struct epoll_event events[EPOLL_MAX_EVENTS];
// Create listening sockets
- for (unsigned int i = 0; i < conf->listening_sockets; i++) {
- int sockfd = create_socket(conf, i);
+ for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
+ int sockfd = create_socket(ctx, i);
if (sockfd < 0)
return 1;
ev.data.fd = sockfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev)) {
- ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n",
+ ERROR(ctx, "Could not add socket file descriptor to epoll(): %s\n",
strerror(errno));
goto ERROR;
}
}
- DEBUG(conf, "Entering main loop...\n");
+ DEBUG(ctx, "Entering main loop...\n");
- while (!conf->terminated) {
+ while (!ctx->terminated) {
int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
if (fds < 1) {
// We terminate gracefully when we receive a signal
if (errno == EINTR)
break;
- ERROR(conf, "epoll_wait() failed: %s\n", strerror(errno));
+ ERROR(ctx, "epoll_wait() failed: %s\n", strerror(errno));
goto ERROR;
}
int fd = events[i].data.fd;
// The listening socket
- if (is_listening_socket(conf, listening_sockets, fd)) {
+ if (is_listening_socket(ctx, listening_sockets, fd)) {
r = accept_connection(&server, fd);
if (r < 0)
goto ERROR;
// Read from the timer to disarm it
ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
if (bytes_read <= 0) {
- ERROR(conf, "Could not read from timerfd: %s\n", strerror(errno));
+ ERROR(ctx, "Could not read from timerfd: %s\n", strerror(errno));
goto ERROR;
}
stats = get_stats(&server);
// Print the stats
- r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER);
+ r = fireperf_dump_stats(ctx, &stats, FIREPERF_MODE_SERVER);
if (r)
goto ERROR;
}
}
ERROR:
- for (unsigned int i = 0; i < conf->listening_sockets; i++) {
+ for (unsigned int i = 0; i < ctx->listening_sockets; i++) {
if (listening_sockets[i] > 0)
close(listening_sockets[i]);
}
#include "main.h"
-int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd);
+int fireperf_server(struct fireperf_ctx* ctx, int epollfd, int timerfd);
#endif /* FIREPERF_SERVER_H */
pthread_t thread;
// Configuration
- struct fireperf_config* conf;
+ struct fireperf_ctx* ctx;
// Collect stats
struct fireperf_stats stats;
int r;
};
-int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf) {
+int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx) {
struct fireperf_worker* w = NULL;
int r;
return -errno;
// Reference the configuration
- w->conf = conf;
+ w->ctx = ctx;
// Create a new event loop
w->epollfd = epoll_create1(0);
if (w->epollfd < 0) {
- ERROR(conf, "Could not create event loop: %m\n");
+ ERROR(ctx, "Could not create event loop: %m\n");
r = -errno;
goto ERROR;
}
- DEBUG(conf, "Created a new worker\n");
+ DEBUG(ctx, "Created a new worker\n");
// Return the worker
*worker = w;
int ready = 0;
int r;
- DEBUG(worker->conf, "New worker launched as %lu\n", pthread_self());
+ DEBUG(worker->ctx, "New worker launched as %lu\n", pthread_self());
struct epoll_event events[EPOLL_MAX_EVENTS];
if (errno == EINTR)
break;
- ERROR(worker->conf, "epoll_wait() failed: %m\n");
+ ERROR(worker->ctx, "epoll_wait() failed: %m\n");
r = -errno;
goto ERROR;
}
// Handle closed connections
if (events[i].events & EPOLLRDHUP) {
- DEBUG(worker->conf, "Connection %d has closed\n", fd);
+ DEBUG(worker->ctx, "Connection %d has closed\n", fd);
// We now have one fewer connections
worker->stats.open_connections--;
// Remove the file descriptor from epoll()
r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL);
if (r) {
- ERROR(worker->conf, "Could not remove socket file descriptfor from epoll(): %m\n");
+ ERROR(worker->ctx, "Could not remove socket file descriptfor from epoll(): %m\n");
r = -errno;
goto ERROR;
}
// Handle incoming data
if (events[i].events & EPOLLIN) {
- r = handle_connection_recv(worker->conf, &worker->stats, fd);
+ r = handle_connection_recv(worker->ctx, &worker->stats, fd);
if (r < 0)
goto ERROR;
}
// Handle outgoing data
if (events[i].events & EPOLLOUT) {
- r = handle_connection_send(worker->conf, &worker->stats, fd);
+ r = handle_connection_send(worker->ctx, &worker->stats, fd);
if (r < 0)
goto ERROR;
}
}
}
- DEBUG(worker->conf, "Worker has gracefully terminated\n");
+ DEBUG(worker->ctx, "Worker has gracefully terminated\n");
ERROR:
// Store the return code
// Launch the worker
r = pthread_create(&worker->thread, NULL, fireperf_worker_main, worker);
if (r) {
- ERROR(worker->conf, "Could not launch the worker: %m\n");
+ ERROR(worker->ctx, "Could not launch the worker: %m\n");
}
return r;
.data.fd = socket,
};
- if (!worker->conf->keepalive_only)
+ if (!worker->ctx->keepalive_only)
ev.events |= EPOLLOUT;
// Add the socket to the loop
r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
if (r)
- ERROR(worker->conf, "Could not add the socket to epoll(): %m\n");
+ ERROR(worker->ctx, "Could not add the socket to epoll(): %m\n");
// We now have one more open connections
worker->stats.open_connections++;
#include "main.h"
-int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf);
+int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_ctx* ctx);
void fireperf_worker_free(struct fireperf_worker* worker);
int fireperf_worker_launch(struct fireperf_worker* worker);