#include <errno.h>
#include <netinet/tcp.h>
-#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "main.h"
#include "server.h"
#include "util.h"
+#include "worker.h"
#define MAX_WORKERS 128
#define SOCKET_BACKLOG 1024
-struct fireperf_worker {
- // Thread
- pthread_t thread;
-
- // Configuration
- struct fireperf_config* conf;
-
- // Collect stats
- struct fireperf_stats stats;
-
- // Client Socket
- int client;
-
- // Epoll
- int epollfd;
-
- // Return code
- int r;
-};
-
struct fireperf_server {
// Configuration
struct fireperf_config* conf;
- // Collect stats
- struct fireperf_stats stats;
+ // Count all connections
+ unsigned int connections;
// Workers
struct fireperf_worker* workers[MAX_WORKERS];
unsigned int num_workers;
};
-static int is_worker_alive(struct fireperf_worker* worker) {
- return pthread_tryjoin_np(worker->thread, NULL);
-}
-
-static int join_worker(struct fireperf_worker* worker) {
- int r;
-
- // Join the worker
- r = pthread_join(worker->thread, NULL);
- if (r) {
- ERROR(worker->conf, "Could not join worker: %s\n", strerror(errno));
- return -errno;
- }
-
- return 0;
-}
+static struct fireperf_stats get_stats(struct fireperf_server* server) {
+ const struct fireperf_stats* s = NULL;
-static void free_worker(struct fireperf_worker* worker) {
- free(worker);
-}
-
-static int free_terminated_workers(struct fireperf_server* server) {
- int r;
-
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- struct fireperf_worker* worker = server->workers[i];
-
- // Skip any empty slots
- if (!worker)
- continue;
-
- // Skip any workers that are still alive
- if (is_worker_alive(worker))
- continue;
-
- // Free worker
- free_worker(worker);
-
- // Clear the slot
- server->workers[i] = NULL;
- }
-}
-
-static void* run_worker(void* w) {
- struct fireperf_worker* worker = w;
- int ready;
- int fd = -1;
- int r;
-
- struct epoll_event events[EPOLL_MAX_EVENTS];
-
- DEBUG(worker->conf, "New worker launched as %lu...\n", pthread_self());
-
- // We have exactly one open connection
- worker->stats.open_connections = 1;
-
- worker->epollfd = -1;
-
- // Initialize epoll()
- worker->epollfd = epoll_create1(0);
- if (worker->epollfd < 0) {
- ERROR(worker->conf, "Could not initialize epoll(): %s\n", strerror(errno));
- r = -errno;
- goto ERROR;
- }
-
- struct epoll_event ev = {
- .events = EPOLLIN|EPOLLRDHUP,
- .data.fd = worker->client,
+ struct fireperf_stats stats = {
+ .connections = server->connections,
};
- if (!worker->conf->keepalive_only)
- ev.events |= EPOLLOUT;
-
- // Add the socket to the loop
- r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, worker->client, &ev);
- if (r) {
- ERROR(worker->conf, "Could not add the socket to epoll(): %s\n", strerror(errno));
- r = -errno;
- goto ERROR;
- }
-
- // Enter the main loop...
- for (;;) {
- ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
- if (ready < 1) {
- // We terminate gracefully when we receive a signal
- if (errno == EINTR)
- break;
-
- ERROR(worker->conf, "epoll_wait() failed: %s\n", strerror(errno));
- r = -errno;
- goto ERROR;
- }
-
- // Loop through all sockets that are ready...
- for (int i = 0; i < ready; i++) {
- fd = events[i].data.fd;
-
- // Handle the client socket
- if (worker->client == fd) {
- if (events[i].events & EPOLLRDHUP) {
- DEBUG(worker->conf, "Connection %d has closed\n", fd);
-
- // There is no point in continuing any further
- // after the client went away
- goto ERROR;
- }
-
- // Handle incoming data
- if (events[i].events & EPOLLIN) {
- r = handle_connection_recv(worker->conf, &worker->stats, fd);
- if (r < 0)
- goto ERROR;
- }
-
- // Handle outgoing data
- if (events[i].events & EPOLLOUT) {
- r = handle_connection_send(worker->conf, &worker->stats, fd);
- if (r < 0)
- goto ERROR;
- }
- }
- }
- }
-
-ERROR:
- if (worker->epollfd >= 0)
- close(worker->epollfd);
- if (worker->client >= 0)
- close(worker->client);
-
- // Store the return code
- worker->r = r;
-
- return &worker->r;
-}
-
-static int create_worker(struct fireperf_server* server, const int fd) {
- struct fireperf_worker* worker = NULL;
- int r;
-
- DEBUG(server->conf, "Creating a new worker for %d\n", fd);
-
- // Check if we have space for a new worker
- if (server->num_workers >= MAX_WORKERS) {
- ERROR(server->conf, "Maximum number of workers reached. Closing connection.\n");
- return -ENOSPC;
- }
-
- // Allocate a new worker
- worker = calloc(1, sizeof(*worker));
- if (!worker)
- return -errno;
-
- // Store a reference to the configuration
- worker->conf = server->conf;
-
- // Store the client socket
- worker->client = fd;
-
- // Store the worker
+ // Add up everything from all workers
for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- if (server->workers[i])
+ if (!server->workers[i])
continue;
- // Store a reference in the first free slot
- server->workers[i] = worker;
- break;
- }
-
- // We now have more workers
- ++server->num_workers;
-
- // Run the worker
- r = pthread_create(&worker->thread, NULL, run_worker, worker);
- if (r) {
- ERROR(server->conf, "Could not launch thread: %s\n", strerror(r));
- return -r;
- }
-
- return 0;
-}
-
-static int cumulate_stats(struct fireperf_stats* stats, struct fireperf_server* server) {
- struct fireperf_worker* worker = NULL;
-
- // Reset open connections
- stats->open_connections = 0;
-
- // Fetch the total number of connections
- stats->connections = server->stats.connections;
-
- // Reset what has been sent in the last period
- stats->bytes_received = 0;
- stats->bytes_sent = 0;
-
- // Reset the stats to the server's stats
- stats->total_bytes_received = server->stats.total_bytes_received;
- stats->total_bytes_sent = server->stats.total_bytes_sent;
-
- // Add up all the worker's stats
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- worker = server->workers[i];
- if (!worker)
+ // Fetch stats
+ s = fireperf_worker_get_stats(server->workers[i]);
+ if (!s)
continue;
- stats->open_connections += worker->stats.open_connections;
+ // Sum up all open connections
+ stats.open_connections += s->open_connections;
- stats->bytes_received += worker->stats.bytes_received;
- stats->total_bytes_received += worker->stats.total_bytes_received;
- stats->bytes_sent += worker->stats.bytes_sent;
- stats->total_bytes_sent += worker->stats.total_bytes_sent;
-
- // Reset
- worker->stats.bytes_received = 0;
- worker->stats.bytes_sent = 0;
+ // Sum up the transferred data
+ stats.bytes_received += s->bytes_received;
+ stats.total_bytes_received += s->total_bytes_received;
+ stats.bytes_sent += s->bytes_sent;
+ stats.total_bytes_sent += s->total_bytes_sent;
}
- return 0;
+ return stats;
}
static int enable_keepalive(struct fireperf_config* conf, int fd) {
return -1;
}
+/*
+ This function selects the worker that is used for the an incoming connection
+*/
+static struct fireperf_worker* fetch_worker(struct fireperf_server* server) {
+ struct fireperf_worker* worker = NULL;
+ int r;
+
+ /*
+ XXX TODO
+ To keep things simple, we create a new worker for each call.
+
+ This should be replaced by a method that searches for the worker with
+ the least amount of connections and start up to as many workers as we
+ have CPU cores.
+ */
+
+ // Create a new worker
+ r = fireperf_worker_create(&worker, server->conf);
+ if (r < 0) {
+ ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // Launch the worker
+ r = fireperf_worker_launch(worker);
+ if (r < 0) {
+ ERROR(server->conf, "Could not launch worker: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // Store a reference
+ server->workers[server->num_workers++] = worker;
+
+ return worker;
+
+ERROR:
+ if (worker)
+ fireperf_worker_free(worker);
+
+ return NULL;
+}
+
static int accept_connection(struct fireperf_server* server, int sockfd) {
+ struct fireperf_worker* worker = NULL;
struct sockaddr_in6 addr = {};
int r;
DEBUG(server->conf, "New connection accepted on socket %d\n", fd);
// A connection has been opened
- server->stats.open_connections++;
- server->stats.connections++;
+ server->connections++;
// Close connections immediately when -x is set
if (server->conf->close) {
- DEBUG(server->conf, "Closing connection %d\n", fd);
r = 0;
goto ERROR;
}
- // Create a new worker
- r = create_worker(server, fd);
+ // Find a worker to delegate this connection to
+ worker = fetch_worker(server);
+
+ // Close the connection if we could not find a worker
+ if (!worker) {
+ ERROR(server->conf, "Could not find a worker that could handle a new connection\n");
+ r = -EBUSY;
+ goto ERROR;
+ }
+
+ // Delegate the connection
+ r = fireperf_worker_delegate(worker, fd);
if (r < 0) {
- ERROR(server->conf, "Could not create worker: %s\n", strerror(-r));
+ ERROR(server->conf, "Could not delegate a new connection to a worker: %s\n", strerror(-r));
goto ERROR;
}
- return fd;
+ return 0;
ERROR:
- if (fd >= 0)
+ if (fd >= 0) {
+ DEBUG(server->conf, "Closing connection %d\n", fd);
+
close(fd);
+ }
return r;
}
goto ERROR;
}
- // Free any terminated workers
- r = free_terminated_workers(&server);
- if (r)
- goto ERROR;
-
// Cumulate stats
- r = cumulate_stats(&stats, &server);
- if (r)
- goto ERROR;
+ stats = get_stats(&server);
// Print the stats
r = fireperf_dump_stats(conf, &stats, FIREPERF_MODE_SERVER);
close(listening_sockets[i]);
}
- // Wait for all workers to terminate
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- if (server.workers[i]) {
- // Wait for the worker to terminate
- join_worker(server.workers[i]);
-
- // Free the worker
- free_worker(server.workers[i]);
-
- server.workers[i] = NULL;
- }
- }
-
return r;
}
// Configuration
struct fireperf_config* conf;
+ // Collect stats
+ struct fireperf_stats stats;
+
// Epoll
int epollfd;
-};
-
-static void fireperf_worker_free(struct fireperf_worker* worker) {
- if (worker->epollfd >= 0)
- close(worker->epollfd);
- free(worker);
-}
+ // Exit Code
+ int r;
+};
int fireperf_worker_create(struct fireperf_worker** worker, struct fireperf_config* conf) {
struct fireperf_worker* w = NULL;
return r;
}
+void fireperf_worker_free(struct fireperf_worker* worker) {
+ if (worker->epollfd >= 0)
+ close(worker->epollfd);
+
+ free(worker);
+}
+
/*
The main function of the worker.
*/
static void* fireperf_worker_main(void* w) {
struct fireperf_worker* worker = w;
+ int ready = 0;
+ int r;
DEBUG(worker->conf, "New worker launched as %lu\n", pthread_self());
- return 0;
+ struct epoll_event events[EPOLL_MAX_EVENTS];
+
+ // Enter the main loop...
+ for (;;) {
+ ready = epoll_wait(worker->epollfd, events, EPOLL_MAX_EVENTS, -1);
+ if (ready < 1) {
+ // We terminate gracefully when we receive a signal
+ if (errno == EINTR)
+ break;
+
+ ERROR(worker->conf, "epoll_wait() failed: %m\n");
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Loop through all sockets that are ready...
+ for (int i = 0; i < ready; i++) {
+ int fd = events[i].data.fd;
+
+ // Handle closed connections
+ if (events[i].events & EPOLLRDHUP) {
+ DEBUG(worker->conf, "Connection %d has closed\n", fd);
+
+ // We now have one fewer connections
+ worker->stats.open_connections--;
+
+ // Remove the file descriptor from epoll()
+ r = epoll_ctl(worker->epollfd, EPOLL_CTL_DEL, fd, NULL);
+ if (r) {
+ ERROR(worker->conf, "Could not remove socket file descriptfor from epoll(): %m\n");
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Free up any resources
+ close(fd);
+
+ // There is no point in continuing any further after the connection closed
+ break;
+ }
+
+ // Handle incoming data
+ if (events[i].events & EPOLLIN) {
+ r = handle_connection_recv(worker->conf, &worker->stats, fd);
+ if (r < 0)
+ goto ERROR;
+ }
+
+ // Handle outgoing data
+ if (events[i].events & EPOLLOUT) {
+ r = handle_connection_send(worker->conf, &worker->stats, fd);
+ if (r < 0)
+ goto ERROR;
+ }
+ }
+ }
+
+ DEBUG(worker->conf, "Worker has gracefully terminated\n");
+
+ERROR:
+ // Store the return code
+ worker->r = r;
+
+ return &worker->r;
}
int fireperf_worker_launch(struct fireperf_worker* worker) {
return r;
}
+int fireperf_worker_terminate(struct fireperf_worker* worker) {
+ // If the worker has terminated, we return its exit code
+ if (!fireperf_worker_is_alive(worker))
+ return worker->r;
+
+ // XXX Otherwise we need to do something else
+ return 0;
+}
+
int fireperf_worker_is_alive(struct fireperf_worker* worker) {
return pthread_tryjoin_np(worker->thread, NULL);
}
+
+int fireperf_worker_delegate(struct fireperf_worker* worker, int socket) {
+ int r;
+
+ struct epoll_event ev = {
+ .events = EPOLLIN|EPOLLRDHUP,
+ .data.fd = socket,
+ };
+
+ if (!worker->conf->keepalive_only)
+ ev.events |= EPOLLOUT;
+
+ // Add the socket to the loop
+ r = epoll_ctl(worker->epollfd, EPOLL_CTL_ADD, socket, &ev);
+ if (r)
+ ERROR(worker->conf, "Could not add the socket to epoll(): %m\n");
+
+ // We now have one more open connections
+ worker->stats.open_connections++;
+
+ return r;
+}
+
+const struct fireperf_stats* fireperf_worker_get_stats(struct fireperf_worker* worker) {
+ return &worker->stats;
+}