#include <errno.h>
#include <netinet/tcp.h>
+#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "server.h"
#include "util.h"
+#define MAX_WORKERS 128
#define SOCKET_BACKLOG 1024
+struct fireperf_worker {
+ // Thread
+ pthread_t thread;
+
+ // Configuration
+ struct fireperf_config* conf;
+
+ // Collect stats
+ struct fireperf_stats stats;
+
+ // Client Socket
+ int client;
+
+ // Epoll
+ int epollfd;
+
+ // Return code
+ int r;
+};
+
+struct fireperf_server {
+ // Configuration
+ struct fireperf_config* conf;
+
+ // Collect stats
+ struct fireperf_stats stats;
+
+ // Workers
+ struct fireperf_worker* workers[MAX_WORKERS];
+ unsigned int num_workers;
+};
+
+static int is_worker_alive(struct fireperf_worker* worker) {
+ return pthread_tryjoin_np(worker->thread, NULL);
+}
+
+static int join_worker(struct fireperf_worker* worker) {
+ int r;
+
+ // Join the worker
+ r = pthread_join(worker->thread, NULL);
+ if (r) {
+ ERROR(worker->conf, "Could not join worker: %s\n", strerror(errno));
+ return -errno;
+ }
+
+ return 0;
+}
+
+static void free_worker(struct fireperf_worker* worker) {
+ free(worker);
+}
+
+static int free_terminated_workers(struct fireperf_server* server) {
+ int r;
+
+ for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+ struct fireperf_worker* worker = server->workers[i];
+
+ // Skip any empty slots
+ if (!worker)
+ continue;
+
+ // Skip any workers that are still alive
+ if (is_worker_alive(worker))
+ continue;
+
+ // Free worker
+ free_worker(worker);
+
+ // Clear the slot
+ server->workers[i] = NULL;
+ }
+}
+
+static void* run_worker(void* w) {
+ struct fireperf_worker* worker = w;
+ int ready;
+ int fd = -1;
+ int r;
+
+ struct epoll_event events[EPOLL_MAX_EVENTS];
+
+ DEBUG(worker->conf, "New worker launched as %lu...\n", pthread_self());
+
+ // We have exactly one open connection
+ worker->stats.open_connections = 1;
+
+ worker->epollfd = -1;
+
+ // Initialize epoll()
+ worker->epollfd = epoll_create1(0);
+ if (worker->epollfd < 0) {
+ ERROR(worker->conf, "Could not initialize epoll(): %s\n", strerror(errno));
+ r = -errno;
+ goto ERROR;
+ }
+
+ struct epoll_event ev = {
+ .events = EPOLLIN|EPOLLRDHUP,
+ .data.fd = worker->client,
+ };
+
+ if (!worker->conf->keepalive_only)
+ ev.events |= EPOLLOUT;
+
+ // Add the socket to the loop
+ r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, worker->client, &ev);
+ if (r) {
+ ERROR(worker->conf, "Could not add the socket to epoll(): %s\n", strerror(errno));
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Enter the main loop...
+ for (;;) {
+ ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
+ if (ready < 1) {
+ // We terminate gracefully when we receive a signal
+ if (errno == EINTR)
+ break;
+
+ ERROR(worker->conf, "epoll_wait() failed: %s\n", strerror(errno));
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Loop through all sockets that are ready...
+ for (int i = 0; i < ready; i++) {
+ fd = events[i].data.fd;
+
+ // Handle the client socket
+ if (worker->client == fd) {
+ if (events[i].events & EPOLLRDHUP) {
+ DEBUG(worker->conf, "Connection %d has closed\n", fd);
+
+ // There is no point in continuing any further
+ // after the client went away
+ goto ERROR;
+ }
+
+ // Handle incoming data
+ if (events[i].events & EPOLLIN) {
+ r = handle_connection_recv(worker->conf, &worker->stats, fd);
+ if (r < 0)
+ goto ERROR;
+ }
+
+ // Handle outgoing data
+ if (events[i].events & EPOLLOUT) {
+ r = handle_connection_send(worker->conf, &worker->stats, fd);
+ if (r < 0)
+ goto ERROR;
+ }
+ }
+ }
+ }
+
+ERROR:
+ if (worker->epollfd >= 0)
+ close(worker->epollfd);
+ if (worker->client >= 0)
+ close(worker->client);
+
+ // Store the return code
+ worker->r = r;
+
+ return &worker->r;
+}
+
+static int create_worker(struct fireperf_server* server, const int fd) {
+ struct fireperf_worker* worker = NULL;
+ int r;
+
+ DEBUG(server->conf, "Creating a new worker for %d\n", fd);
+
+ // Check if we have space for a new worker
+ if (server->num_workers >= MAX_WORKERS) {
+ ERROR(server->conf, "Maximum number of workers reached. Closing connection.\n");
+ return -ENOSPC;
+ }
+
+ // Allocate a new worker
+ worker = calloc(1, sizeof(*worker));
+ if (!worker)
+ return -errno;
+
+ // Store a reference to the configuration
+ worker->conf = server->conf;
+
+ // Store the client socket
+ worker->client = fd;
+
+ // Store the worker
+ for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+ if (server->workers[i])
+ continue;
+
+ // Store a reference in the first free slot
+ server->workers[i] = worker;
+ break;
+ }
+
+ // We now have more workers
+ ++server->num_workers;
+
+ // Run the worker
+ r = pthread_create(&worker->thread, NULL, run_worker, worker);
+ if (r) {
+ ERROR(server->conf, "Could not launch thread: %s\n", strerror(r));
+ return -r;
+ }
+
+ return 0;
+}
+
+static int cumulate_stats(struct fireperf_stats* stats, struct fireperf_server* server) {
+ struct fireperf_worker* worker = NULL;
+
+ // Reset open connections
+ stats->open_connections = 0;
+
+ // Fetch the total number of connections
+ stats->connections = server->stats.connections;
+
+ // Reset what has been sent in the last period
+ stats->bytes_received = 0;
+ stats->bytes_sent = 0;
+
+ // Reset the stats to the server's stats
+ stats->total_bytes_received = server->stats.total_bytes_received;
+ stats->total_bytes_sent = server->stats.total_bytes_sent;
+
+ // Add up all the worker's stats
+ for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+ worker = server->workers[i];
+ if (!worker)
+ continue;
+
+ stats->open_connections += worker->stats.open_connections;
+
+ stats->bytes_received += worker->stats.bytes_received;
+ stats->total_bytes_received += worker->stats.total_bytes_received;
+ stats->bytes_sent += worker->stats.bytes_sent;
+ stats->total_bytes_sent += worker->stats.total_bytes_sent;
+
+ // Reset
+ worker->stats.bytes_received = 0;
+ worker->stats.bytes_sent = 0;
+ }
+
+ return 0;
+}
+
static int enable_keepalive(struct fireperf_config* conf, int fd) {
// Enable keepalive
int flags = 1;
return -1;
}
-static int accept_connection(struct fireperf_config* conf, int sockfd) {
- struct sockaddr_in6 addr;
+static int accept_connection(struct fireperf_server* server, int sockfd) {
+ struct sockaddr_in6 addr = {};
+ int r;
+
socklen_t l = sizeof(addr);
int fd = -1;
} while (fd < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
if (fd < 0) {
- ERROR(conf, "Could not accept a new connection: %s\n", strerror(errno));
+ ERROR(server->conf, "Could not accept a new connection: %s\n", strerror(errno));
return -1;
}
- DEBUG(conf, "New connection accepted on socket %d\n", fd);
+ DEBUG(server->conf, "New connection accepted on socket %d\n", fd);
+
+ // A connection has been opened
+ server->stats.open_connections++;
+ server->stats.connections++;
+
+ // Close connections immediately when -x is set
+ if (server->conf->close) {
+ DEBUG(server->conf, "Closing connection %d\n", fd);
+ r = 0;
+
+ goto ERROR;
+ }
+
+ // Create a new worker
+ r = create_worker(server, fd);
+ if (r < 0) {
+ ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+ goto ERROR;
+ }
return fd;
+
+ERROR:
+ if (fd >= 0)
+ close(fd);
+
+ return r;
}
static int is_listening_socket(struct fireperf_config* conf, int* sockets, int fd) {
return 0;
}
-int fireperf_server(struct fireperf_config* conf, struct fireperf_stats* stats,
- int epollfd, int timerfd) {
+int fireperf_server(struct fireperf_config* conf, int epollfd, int timerfd) {
+ struct fireperf_server server = {
+ .conf = conf,
+ };
+ struct fireperf_stats stats = {};
+ uint64_t expirations = 0;
+ int r;
+
DEBUG(conf, "Launching " PACKAGE_NAME " in server mode\n");
int listening_sockets[conf->listening_sockets];
- int r = 1;
struct epoll_event ev;
struct epoll_event events[EPOLL_MAX_EVENTS];
// The listening socket
if (is_listening_socket(conf, listening_sockets, fd)) {
- int connfd = accept_connection(conf, fd);
- if (connfd < 0)
+ r = accept_connection(&server, fd);
+ if (r < 0)
goto ERROR;
- // Add the new socket to epoll()
- ev.data.fd = connfd;
- ev.events = EPOLLIN|EPOLLRDHUP;
- if (!conf->keepalive_only)
- ev.events |= EPOLLOUT;
-
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev)) {
- ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n",
- strerror(errno));
- goto ERROR;
- }
-
- // A connection has been opened
- stats->open_connections++;
- stats->connections++;
-
// Handle timer events
} else if (fd == timerfd) {
- uint64_t expirations;
-
// Read from the timer to disarm it
ssize_t bytes_read = read(timerfd, &expirations, sizeof(expirations));
if (bytes_read <= 0) {
goto ERROR;
}
- r = fireperf_dump_stats(conf, stats, FIREPERF_MODE_SERVER);
+ // Free any terminated workers
+ r = free_terminated_workers(&server);
if (r)
goto ERROR;
- // Handle any connection events
- } else {
- if (events[i].events & EPOLLRDHUP) {
- DEBUG(conf, "Connection %d has closed\n", fd);
-
- // Remove the file descriptor from epoll()
- if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) {
- ERROR(conf, "Could not remove socket file descriptfor from epoll(): %s\n",
- strerror(errno));
- }
-
- // Free up any resources
- close(fd);
-
- // This connection is now closed
- stats->open_connections--;
-
- // Skip processing anything else, because it would be pointless
- continue;
- }
-
- // Close connections immediately when -x is set
- if (conf->close) {
- DEBUG(conf, "Closing connection %d\n", fd);
- close(fd);
-
- stats->open_connections--;
- continue;
- }
-
- // Handle incoming data
- if (events[i].events & EPOLLIN) {
- r = handle_connection_recv(conf, stats, fd);
- if (r < 0)
- goto ERROR;
- }
+ // Cumulate stats
+ r = cumulate_stats(&stats, &server);
+ if (r)
+ goto ERROR;
- // Handle outgoing data
- if (events[i].events & EPOLLOUT) {
- r = handle_connection_send(conf, stats, fd);
- if (r < 0)
- goto ERROR;
- }
+ // Print the stats
+ r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER);
+ if (r)
+ goto ERROR;
}
}
}
close(listening_sockets[i]);
}
+ // Wait for all workers to terminate
+ for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+ if (server.workers[i]) {
+ // Wait for the worker to terminate
+ join_worker(server.workers[i]);
+
+ // Free the worker
+ free_worker(server.workers[i]);
+
+ server.workers[i] = NULL;
+ }
+ }
+
return r;
}