#include "clients.h"
-#include <pthread.h>
+#include <sys/queue.h>
#include "common.h"
#include "log.h"
#include "data_structure/uthash_nonfatal.h"
UT_hash_handle hh;
};
-/** Hash table of clients */
-static struct hashable_client *table;
+struct thread_node {
+ pthread_t tid;
+ SLIST_ENTRY(thread_node) next;
+};
+
+/*
+ * Thread ids are stored apart so that the caller can join each thread at the
+ * end.
+ *
+ * The join should be made when the db is cleared, so the main process should
+ * do it.
+ */
+SLIST_HEAD(thread_list, thread_node);
+
+/** Hash table of clients and threads */
+static struct clients_table {
+ struct hashable_client *clients;
+ struct thread_list threads;
+} db;
+
/** Read/write lock, which protects @table and its inhabitants. */
static pthread_rwlock_t lock;
-/** Serial number from which deltas must be stored */
-static serial_t min_serial;
int
clients_db_init(void)
{
int error;
- table = NULL;
+ db.clients = NULL;
+ SLIST_INIT(&db.threads);
+
error = pthread_rwlock_init(&lock, NULL);
if (error)
return pr_errno(error, "pthread_rwlock_init() errored");
- min_serial = 0;
return 0;
}
-static int
-create_client(struct rtr_client *client, struct hashable_client **result)
+static struct hashable_client *
+create_client(int fd, struct sockaddr_storage addr, pthread_t tid)
{
- struct hashable_client *node;
+ struct hashable_client *client;
- node = malloc(sizeof(struct hashable_client));
- if (node == NULL)
- return pr_enomem();
+ client = malloc(sizeof(struct hashable_client));
+ if (client == NULL)
+ return NULL;
+ /* Needed by uthash */
+ memset(client, 0, sizeof(struct hashable_client));
- node->meat.fd = client->fd;
- node->meat.serial_number_set = false;
+ client->meat.fd = fd;
+ client->meat.serial_number_set = false;
+ client->meat.addr = addr;
- *result = node;
- return 0;
+ return client;
}
/*
* If the client whose file descriptor is @fd isn't already stored, store it.
*/
int
-clients_add(struct rtr_client *client)
+clients_add(int fd, struct sockaddr_storage addr, pthread_t tid)
{
- struct hashable_client *new_client = NULL;
+ struct hashable_client *new_client;
struct hashable_client *old_client;
- int error;
+ struct thread_node *new_thread;
- error = create_client(client, &new_client);
- if (error)
- return error;
+ new_client = create_client(fd, addr, tid);
+ if (new_client == NULL)
+ return pr_enomem();
+
+ new_thread = malloc(sizeof(struct thread_node));
+ if (new_thread == NULL) {
+ free(new_client);
+ return pr_enomem();
+ }
+ new_thread->tid = tid;
rwlock_write_lock(&lock);
- HASH_FIND_INT(table, &client->fd, old_client);
- if (old_client == NULL) {
- errno = 0;
- HASH_ADD_INT(table, meat.fd, new_client);
- if (errno) {
- rwlock_unlock(&lock);
- free(new_client);
- return -pr_errno(errno, "Couldn't create client");
- }
- new_client = NULL;
+ errno = 0;
+ HASH_REPLACE(hh, db.clients, meat.fd, sizeof(new_client->meat.fd),
+ new_client, old_client);
+ if (errno) {
+ rwlock_unlock(&lock);
+ free(new_client);
+ return -pr_errno(errno, "Client couldn't be stored");
}
+ if (old_client != NULL)
+ free(old_client);
- rwlock_unlock(&lock);
+ SLIST_INSERT_HEAD(&db.threads, new_thread, next);
- if (new_client != NULL)
- free(new_client);
+ rwlock_unlock(&lock);
return 0;
}
+int
+clients_get_addr(int fd, struct sockaddr_storage *addr)
+{
+ struct hashable_client *client;
+ int result;
+
+ result = -ENOENT;
+ rwlock_write_lock(&lock);
+
+ HASH_FIND_INT(db.clients, &fd, client);
+ if (client != NULL) {
+ *addr = client->meat.addr;
+ result = 0;
+ }
+
+ rwlock_unlock(&lock);
+
+ return result;
+}
+
void
clients_update_serial(int fd, serial_t serial)
{
struct hashable_client *cur_client;
rwlock_write_lock(&lock);
- HASH_FIND_INT(table, &fd, cur_client);
+ HASH_FIND_INT(db.clients, &fd, cur_client);
if (cur_client == NULL)
goto unlock;
cur_client->meat.serial_number = serial;
+ cur_client->meat.serial_number_set = true;
unlock:
rwlock_unlock(&lock);
}
-serial_t
-clients_get_min_serial(void)
+int
+clients_get_min_serial(serial_t *result)
{
- struct hashable_client *current, *ptr;
- serial_t result;
+ struct hashable_client *current, *tmp;
+ int retval;
- rwlock_write_lock(&lock);
- if (HASH_COUNT(table) == 0)
+ retval = -ENOENT;
+ rwlock_read_lock(&lock);
+ if (HASH_COUNT(db.clients) == 0)
goto unlock;
- min_serial = table->meat.serial_number;
- HASH_ITER(hh, table, current, ptr)
- if (current->meat.serial_number < min_serial)
- min_serial = current->meat.serial_number;
+ HASH_ITER(hh, db.clients, current, tmp) {
+ if (!current->meat.serial_number_set)
+ continue;
+ if (retval) {
+ *result = current->meat.serial_number;
+ retval = 0;
+ } else if (current->meat.serial_number < *result)
+ *result = current->meat.serial_number;
+ }
unlock:
- result = min_serial;
rwlock_unlock(&lock);
-
- return result;
+ return retval;
}
void
rwlock_write_lock(&lock);
- HASH_FIND_INT(table, &fd, client);
+ HASH_FIND_INT(db.clients, &fd, client);
if (client != NULL) {
- HASH_DEL(table, client);
+ HASH_DEL(db.clients, client);
free(client);
}
int
clients_foreach(clients_foreach_cb cb, void *arg)
{
- struct hashable_client *client;
+ struct hashable_client *client, *tmp;
int error;
error = rwlock_read_lock(&lock);
if (error)
return error;
- for (client = table; client != NULL; client = client->hh.next) {
+ HASH_ITER(hh, db.clients, client, tmp) {
error = cb(&client->meat, arg);
if (error)
break;
return error;
}
+/*
+ * Destroy the clients DB, the @join_thread_cb will be made for each thread
+ * that was started by the parent process (@arg will be sent at that call).
+ */
void
-clients_db_destroy(void)
+clients_db_destroy(join_thread_cb cb, void *arg)
{
struct hashable_client *node, *tmp;
+ struct thread_node *thread;
- HASH_ITER(hh, table, node, tmp) {
- HASH_DEL(table, node);
+ HASH_ITER(hh, db.clients, node, tmp) {
+ HASH_DEL(db.clients, node);
free(node);
}
+ while (!SLIST_EMPTY(&db.threads)) {
+ thread = SLIST_FIRST(&db.threads);
+ SLIST_REMOVE_HEAD(&db.threads, next);
+
+ /* Not much to do on failure */
+ cb(thread->tid, arg);
+ free(thread);
+ }
+
pthread_rwlock_destroy(&lock); /* Nothing to do with error code */
}
#ifndef SRC_CLIENTS_H_
#define SRC_CLIENTS_H_
+#include <pthread.h>
#include <stdbool.h>
+#include <netinet/in.h>
#include "rtr/pdu.h"
#include "rtr/db/vrp.h"
struct client {
int fd;
+ struct sockaddr_storage addr;
serial_t serial_number;
bool serial_number_set;
int clients_db_init(void);
-int clients_add(struct rtr_client *);
+int clients_add(int, struct sockaddr_storage, pthread_t);
void clients_update_serial(int, serial_t);
void clients_forget(int);
typedef int (*clients_foreach_cb)(struct client const *, void *);
int clients_foreach(clients_foreach_cb, void *);
-serial_t clients_get_min_serial(void);
+int clients_get_min_serial(serial_t *);
+int clients_get_addr(int, struct sockaddr_storage *);
-void clients_db_destroy(void);
+typedef int (*join_thread_cb)(pthread_t, void *);
+void clients_db_destroy(join_thread_cb, void *);
#endif /* SRC_CLIENTS_H_ */
error = vrps_init();
if (error)
goto just_quit;
- error = clients_db_init();
- if (error)
- goto revert_vrps;
error = rtr_listen();
- clients_db_destroy();
-revert_vrps:
vrps_destroy();
just_quit:
return error;
/** ROA changes to @base over time. */
struct deltas_db deltas;
- /*
- * TODO (whatever) Maybe rename this. It's not really the current
- * serial; it's the next one.
- */
- serial_t current_serial;
+ serial_t next_serial;
uint16_t v0_session_id;
uint16_t v1_session_id;
time_t last_modified_date;
* "desynchronization" (more at RFC 6810 'Glossary' and
* 'Fields of a PDU')
*/
- state.current_serial = START_SERIAL;
+ state.next_serial = START_SERIAL;
/* Get the bits that'll fit in session_id */
state.v0_session_id = time(NULL) & 0xFFFF;
return 0;
}
+/*
+ * Make an empty dummy delta array.
+ * It's annoying, but temporary. (Until it expires.) Otherwise, it'd be a pain
+ * to have to check NULL delta_group.deltas all the time.
+ */
+static int
+create_empty_delta(struct deltas **deltas)
+{
+ struct delta_group deltas_node;
+ int error;
+
+ error = deltas_create(deltas);
+ if (error)
+ return error;
+
+ deltas_node.serial = state.next_serial;
+ deltas_node.deltas = *deltas;
+ error = deltas_db_add(&state.deltas, &deltas_node);
+ if (error)
+ deltas_refput(*deltas);
+ return error;
+}
+
/**
* Reallocate the array of @db starting at @start, the length and capacity are
* calculated according to the new start.
*/
-static void
+static int
resize_deltas_db(struct deltas_db *db, struct delta_group *start)
{
struct delta_group *tmp, *ptr;
while (db->len < db->capacity / 2)
db->capacity /= 2;
tmp = malloc(sizeof(struct delta_group) * db->capacity);
- if (tmp == NULL) {
- pr_enomem();
- return;
- }
+ if (tmp == NULL)
+ return pr_enomem();
+
memcpy(tmp, start, db->len * sizeof(struct delta_group));
/* Release memory allocated */
for (ptr = db->array; ptr < start; ptr++)
deltas_refput(ptr->deltas);
free(db->array);
db->array = tmp;
+
+ return 0;
}
-static void
-vrps_purge(void)
+/*
+ * Lock must be requested before calling this function
+ */
+static int
+vrps_purge(struct deltas **deltas)
{
struct delta_group *group;
array_index i;
- uint32_t min_serial;
+ serial_t min_serial;
- min_serial = clients_get_min_serial();
+ if (clients_get_min_serial(&min_serial) != 0) {
+ /* Nobody will need deltas, just leave an empty one */
+ deltas_refput(*deltas);
+ deltas_db_cleanup(&state.deltas, deltagroup_cleanup);
+ deltas_db_init(&state.deltas);
+ return create_empty_delta(deltas);
+ }
- /* Assume is ordered by serial, so get the new initial pointer */
+ /* Assume its ordered by serial, so get the new initial pointer */
ARRAYLIST_FOREACH(&state.deltas, group, i)
if (group->serial >= min_serial)
break;
- /* Is the first element or reached end, nothing to purge */
+ /* Its the first element or reached end, nothing to purge */
if (group == state.deltas.array ||
(group - state.deltas.array) == state.deltas.len)
- return;
+ return 0;
- resize_deltas_db(&state.deltas, group);
+ return resize_deltas_db(&state.deltas, group);
}
int
struct roa_table *new_base;
struct deltas *deltas; /* Deltas in raw form */
struct delta_group deltas_node; /* Deltas in database node form */
+ serial_t min_serial;
int error;
*changed = false;
goto revert_deltas; /* error == 0 is good */
}
- deltas_node.serial = state.current_serial;
- deltas_node.deltas = deltas;
- error = deltas_db_add(&state.deltas, &deltas_node);
- if (error) {
- rwlock_unlock(&lock);
- goto revert_deltas;
+ /* Just store deltas if someone will care about it */
+ if (clients_get_min_serial(&min_serial) == 0) {
+ deltas_node.serial = state.next_serial;
+ deltas_node.deltas = deltas;
+ error = deltas_db_add(&state.deltas, &deltas_node);
+ if (error) {
+ rwlock_unlock(&lock);
+ goto revert_deltas;
+ }
}
/*
old_base = state.base;
/* Remove unnecessary deltas */
- vrps_purge();
-
- } else {
- /*
- * Make an empty dummy delta array.
- * It's annoying, but temporary. (Until it expires.)
- * Otherwise, it'd be a pain to have to check NULL
- * delta_group.deltas all the time.
- */
- error = deltas_create(&deltas);
+ error = vrps_purge(&deltas);
if (error) {
rwlock_unlock(&lock);
goto revert_base;
}
-
- deltas_node.serial = state.current_serial;
- deltas_node.deltas = deltas;
- error = deltas_db_add(&state.deltas, &deltas_node);
+ } else {
+ error = create_empty_delta(&deltas);
if (error) {
rwlock_unlock(&lock);
- goto revert_deltas;
+ goto revert_base;
}
}
*changed = true;
state.base = new_base;
- state.current_serial++;
+ state.next_serial++;
rwlock_unlock(&lock);
if (state.base != NULL) {
error = roa_table_foreach_roa(state.base, cb, arg);
- *serial = state.current_serial - 1;
+ *serial = state.next_serial - 1;
} else {
error = -EAGAIN;
}
return error;
if (state.base != NULL)
- *result = state.current_serial - 1;
+ *result = state.next_serial - 1;
else
error = -EAGAIN;
#define RTR_V0 0
#define RTR_V1 1
-struct rtr_client {
- int fd;
- struct sockaddr_storage addr;
-};
-
/** A request from an RTR client. */
struct rtr_request {
/** Raw bytes. */
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
-#include <sys/queue.h>
#include "config.h"
#include "clients.h"
struct sigaction act;
-struct thread_node {
+struct thread_param {
+ int fd;
pthread_t tid;
- struct rtr_client client;
- SLIST_ENTRY(thread_node) next;
+ struct sockaddr_storage addr;
};
-SLIST_HEAD(thread_list, thread_node) threads;
-
static int
init_addrinfo(struct addrinfo **result)
{
}
static void
-print_close_failure(int error, struct sockaddr_storage *sockaddr)
+print_close_failure(int error, int fd)
{
+ struct sockaddr_storage sockaddr;
char buffer[INET6_ADDRSTRLEN];
void *addr = NULL;
char const *addr_str;
- switch (sockaddr->ss_family) {
+ if (clients_get_addr(fd, &sockaddr) != 0) {
+ addr_str = "(unknown)";
+ goto done;
+ }
+ switch (sockaddr.ss_family) {
case AF_INET:
- addr = &((struct sockaddr_in *) sockaddr)->sin_addr;
+ addr = &((struct sockaddr_in *) &sockaddr)->sin_addr;
break;
case AF_INET6:
- addr = &((struct sockaddr_in6 *) sockaddr)->sin6_addr;
+ addr = &((struct sockaddr_in6 *) &sockaddr)->sin6_addr;
break;
default:
addr_str = "(protocol unknown)";
goto done;
}
- addr_str = inet_ntop(sockaddr->ss_family, addr, buffer,
+ addr_str = inet_ntop(sockaddr.ss_family, addr, buffer,
INET6_ADDRSTRLEN);
if (addr_str == NULL)
addr_str = "(unprintable address)";
pr_errno(error, "close() failed on socket of client %s", addr_str);
}
-static void *
-end_client(struct rtr_client *client)
+static void
+end_client(int fd)
{
- if (close(client->fd) != 0)
- print_close_failure(errno, &client->addr);
-
- clients_forget(client->fd);
- return NULL;
+ if (close(fd) != 0)
+ print_close_failure(errno, fd);
}
/*
* The client socket threads' entry routine.
+ * @arg must be released.
*/
static void *
client_thread_cb(void *arg)
{
- struct rtr_client *client = arg;
struct pdu_metadata const *meta;
struct rtr_request request;
+ struct thread_param param;
int error;
+ memcpy(¶m, arg, sizeof(param));
+ free(arg);
+
+ error = clients_add(param.fd, param.addr, param.tid);
+ if (error) {
+ close(param.fd);
+ return NULL;
+ }
while (true) { /* For each PDU... */
- error = pdu_load(client->fd, &request, &meta);
+ error = pdu_load(param.fd, &request, &meta);
if (error)
break;
- error = meta->handle(client->fd, &request);
+ error = meta->handle(param.fd, &request);
clean_request(&request, meta);
if (error)
break;
}
- return end_client(client);
+ end_client(param.fd);
+ clients_forget(param.fd);
+
+ return NULL;
}
/*
handle_client_connections(int server_fd)
{
struct sockaddr_storage client_addr;
- struct thread_node *new_thread;
+ struct thread_param *param;
socklen_t sizeof_client_addr;
int client_fd;
int error;
* So don't interrupt the thread when this happens.
*/
- /*
- * TODO this is more complicated than it needs to be.
- * We have a client hash table and a thread linked list.
- * These two contain essentially the same entries. It's
- * redundant.
- */
-
- new_thread = malloc(sizeof(struct thread_node));
- if (new_thread == NULL) {
+ param = malloc(sizeof(struct thread_param));
+ if (param == NULL) {
/* No error response PDU on memory allocation. */
- pr_err("Couldn't create thread struct");
+ pr_err("Couldn't create thread parameters struct");
close(client_fd);
continue;
}
+ param->fd = client_fd;
+ param->addr = client_addr;
- new_thread->client.fd = client_fd;
- new_thread->client.addr = client_addr;
-
- error = clients_add(&new_thread->client);
- if (error) {
- /*
- * Presently, clients_add() can only fail due to alloc
- * failure. No error report PDU.
- */
- free(new_thread);
- close(client_fd);
- continue;
- }
-
- error = pthread_create(&new_thread->tid, NULL,
- client_thread_cb, &new_thread->client);
+ error = pthread_create(¶m->tid, NULL,
+ client_thread_cb, param);
if (error && error != EAGAIN)
err_pdu_send_internal_error(client_fd);
if (error) {
pr_errno(error, "Could not spawn the client's thread");
- clients_forget(client_fd);
- free(new_thread);
close(client_fd);
- continue;
+ free(param);
}
- SLIST_INSERT_HEAD(&threads, new_thread, next);
-
} while (true);
return 0; /* Unreachable. */
return error;
}
-/* Terminates client threads as gracefully as I know how to. */
+/*
+ * Receive @arg to be called as a clients_foreach_cb
+ */
+static int
+kill_client(struct client const *client, void *arg)
+{
+ end_client(client->fd);
+ /* Don't call clients_forget to avoid deadlock! */
+ return 0;
+}
+
static void
-wait_threads(void)
+end_clients(void)
{
- struct thread_node *ptr;
+ clients_foreach(kill_client, NULL);
+ /* Let the clients be deleted when clients DB is destroyed */
+}
- while (!SLIST_EMPTY(&threads)) {
- ptr = SLIST_FIRST(&threads);
- SLIST_REMOVE_HEAD(&threads, next);
- /*
- * If the close fails, the thread might still be using the
- * thread_param variables, so leak instead.
- */
- if (close_thread(ptr->tid, "Client") == 0)
- free(ptr);
- }
+static int
+join_thread(pthread_t tid, void *arg)
+{
+ return close_thread(tid, "Client");
}
/*
if (error)
return error;
- /* Server ready, start updates thread */
- error = updates_daemon_start();
+ /* Server ready, start everything else */
+ error = clients_db_init();
if (error)
goto revert_server_socket;
- /* Init global vars */
- SLIST_INIT(&threads);
+ error = updates_daemon_start();
+ if (error)
+ goto revert_clients_db;
error = init_signal_handler();
if (error)
error = handle_client_connections(server_fd);
- wait_threads();
+ end_clients();
revert_updates_daemon:
updates_daemon_destroy();
+revert_clients_db:
+ clients_db_destroy(join_thread, NULL);
revert_server_socket:
close(server_fd);
return error;