From: pcarana Date: Mon, 20 May 2019 22:06:50 +0000 (-0500) Subject: Attend some TODOs on clients and vrps DB X-Git-Tag: v0.0.2~29 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cd32eaa4f7ae5ddfcc8c5a69947f5d24ddafd836;p=thirdparty%2FFORT-validator.git Attend some TODOs on clients and vrps DB +Clients: - Unify thread's DB and clients DB; threads IDs are remembered apart from client FDs, so that the main process can join all threads at the end. - Remove (now unnecessary) struct rtr_client. - End clients gracefully when the server is stopped (release memory, close sockets, join threads) and when communication between server-client stops in a regular way (eg. all PDUs were sent and the communication is terminated). - Add a client to the DB until its own thread has started. - Init clients DB from rtr.c instead of main.c. +VRPS: - Remove min_serial storage, is useless; calculate it instead when requested. - Purge deltas when needed and don't store them if there are no clients. - Rename 'current_serial' to 'next_serial'. --- diff --git a/src/clients.c b/src/clients.c index 661b0bfe..75184160 100644 --- a/src/clients.c +++ b/src/clients.c @@ -1,6 +1,6 @@ #include "clients.h" -#include +#include #include "common.h" #include "log.h" #include "data_structure/uthash_nonfatal.h" @@ -11,114 +11,163 @@ struct hashable_client { UT_hash_handle hh; }; -/** Hash table of clients */ -static struct hashable_client *table; +struct thread_node { + pthread_t tid; + SLIST_ENTRY(thread_node) next; +}; + +/* + * Thread ids are stored apart so that the caller can join each thread at the + * end. + * + * The join should be made when the db is cleared, so the main process should + * do it. + */ +SLIST_HEAD(thread_list, thread_node); + +/** Hash table of clients and threads */ +static struct clients_table { + struct hashable_client *clients; + struct thread_list threads; +} db; + /** Read/write lock, which protects @table and its inhabitants. */ static pthread_rwlock_t lock; -/** Serial number from which deltas must be stored */ -static serial_t min_serial; int clients_db_init(void) { int error; - table = NULL; + db.clients = NULL; + SLIST_INIT(&db.threads); + error = pthread_rwlock_init(&lock, NULL); if (error) return pr_errno(error, "pthread_rwlock_init() errored"); - min_serial = 0; return 0; } -static int -create_client(struct rtr_client *client, struct hashable_client **result) +static struct hashable_client * +create_client(int fd, struct sockaddr_storage addr, pthread_t tid) { - struct hashable_client *node; + struct hashable_client *client; - node = malloc(sizeof(struct hashable_client)); - if (node == NULL) - return pr_enomem(); + client = malloc(sizeof(struct hashable_client)); + if (client == NULL) + return NULL; + /* Needed by uthash */ + memset(client, 0, sizeof(struct hashable_client)); - node->meat.fd = client->fd; - node->meat.serial_number_set = false; + client->meat.fd = fd; + client->meat.serial_number_set = false; + client->meat.addr = addr; - *result = node; - return 0; + return client; } /* * If the client whose file descriptor is @fd isn't already stored, store it. */ int -clients_add(struct rtr_client *client) +clients_add(int fd, struct sockaddr_storage addr, pthread_t tid) { - struct hashable_client *new_client = NULL; + struct hashable_client *new_client; struct hashable_client *old_client; - int error; + struct thread_node *new_thread; - error = create_client(client, &new_client); - if (error) - return error; + new_client = create_client(fd, addr, tid); + if (new_client == NULL) + return pr_enomem(); + + new_thread = malloc(sizeof(struct thread_node)); + if (new_thread == NULL) { + free(new_client); + return pr_enomem(); + } + new_thread->tid = tid; rwlock_write_lock(&lock); - HASH_FIND_INT(table, &client->fd, old_client); - if (old_client == NULL) { - errno = 0; - HASH_ADD_INT(table, meat.fd, new_client); - if (errno) { - rwlock_unlock(&lock); - free(new_client); - return -pr_errno(errno, "Couldn't create client"); - } - new_client = NULL; + errno = 0; + HASH_REPLACE(hh, db.clients, meat.fd, sizeof(new_client->meat.fd), + new_client, old_client); + if (errno) { + rwlock_unlock(&lock); + free(new_client); + return -pr_errno(errno, "Client couldn't be stored"); } + if (old_client != NULL) + free(old_client); - rwlock_unlock(&lock); + SLIST_INSERT_HEAD(&db.threads, new_thread, next); - if (new_client != NULL) - free(new_client); + rwlock_unlock(&lock); return 0; } +int +clients_get_addr(int fd, struct sockaddr_storage *addr) +{ + struct hashable_client *client; + int result; + + result = -ENOENT; + rwlock_write_lock(&lock); + + HASH_FIND_INT(db.clients, &fd, client); + if (client != NULL) { + *addr = client->meat.addr; + result = 0; + } + + rwlock_unlock(&lock); + + return result; +} + void clients_update_serial(int fd, serial_t serial) { struct hashable_client *cur_client; rwlock_write_lock(&lock); - HASH_FIND_INT(table, &fd, cur_client); + HASH_FIND_INT(db.clients, &fd, cur_client); if (cur_client == NULL) goto unlock; cur_client->meat.serial_number = serial; + cur_client->meat.serial_number_set = true; unlock: rwlock_unlock(&lock); } -serial_t -clients_get_min_serial(void) +int +clients_get_min_serial(serial_t *result) { - struct hashable_client *current, *ptr; - serial_t result; + struct hashable_client *current, *tmp; + int retval; - rwlock_write_lock(&lock); - if (HASH_COUNT(table) == 0) + retval = -ENOENT; + rwlock_read_lock(&lock); + if (HASH_COUNT(db.clients) == 0) goto unlock; - min_serial = table->meat.serial_number; - HASH_ITER(hh, table, current, ptr) - if (current->meat.serial_number < min_serial) - min_serial = current->meat.serial_number; + HASH_ITER(hh, db.clients, current, tmp) { + if (!current->meat.serial_number_set) + continue; + if (retval) { + *result = current->meat.serial_number; + retval = 0; + } else if (current->meat.serial_number < *result) + *result = current->meat.serial_number; + } unlock: - result = min_serial; rwlock_unlock(&lock); - - return result; + return retval; } void @@ -128,9 +177,9 @@ clients_forget(int fd) rwlock_write_lock(&lock); - HASH_FIND_INT(table, &fd, client); + HASH_FIND_INT(db.clients, &fd, client); if (client != NULL) { - HASH_DEL(table, client); + HASH_DEL(db.clients, client); free(client); } @@ -140,14 +189,14 @@ clients_forget(int fd) int clients_foreach(clients_foreach_cb cb, void *arg) { - struct hashable_client *client; + struct hashable_client *client, *tmp; int error; error = rwlock_read_lock(&lock); if (error) return error; - for (client = table; client != NULL; client = client->hh.next) { + HASH_ITER(hh, db.clients, client, tmp) { error = cb(&client->meat, arg); if (error) break; @@ -158,15 +207,29 @@ clients_foreach(clients_foreach_cb cb, void *arg) return error; } +/* + * Destroy the clients DB, the @join_thread_cb will be made for each thread + * that was started by the parent process (@arg will be sent at that call). + */ void -clients_db_destroy(void) +clients_db_destroy(join_thread_cb cb, void *arg) { struct hashable_client *node, *tmp; + struct thread_node *thread; - HASH_ITER(hh, table, node, tmp) { - HASH_DEL(table, node); + HASH_ITER(hh, db.clients, node, tmp) { + HASH_DEL(db.clients, node); free(node); } + while (!SLIST_EMPTY(&db.threads)) { + thread = SLIST_FIRST(&db.threads); + SLIST_REMOVE_HEAD(&db.threads, next); + + /* Not much to do on failure */ + cb(thread->tid, arg); + free(thread); + } + pthread_rwlock_destroy(&lock); /* Nothing to do with error code */ } diff --git a/src/clients.h b/src/clients.h index be95e575..73feafdc 100644 --- a/src/clients.h +++ b/src/clients.h @@ -1,12 +1,15 @@ #ifndef SRC_CLIENTS_H_ #define SRC_CLIENTS_H_ +#include #include +#include #include "rtr/pdu.h" #include "rtr/db/vrp.h" struct client { int fd; + struct sockaddr_storage addr; serial_t serial_number; bool serial_number_set; @@ -14,13 +17,15 @@ struct client { int clients_db_init(void); -int clients_add(struct rtr_client *); +int clients_add(int, struct sockaddr_storage, pthread_t); void clients_update_serial(int, serial_t); void clients_forget(int); typedef int (*clients_foreach_cb)(struct client const *, void *); int clients_foreach(clients_foreach_cb, void *); -serial_t clients_get_min_serial(void); +int clients_get_min_serial(serial_t *); +int clients_get_addr(int, struct sockaddr_storage *); -void clients_db_destroy(void); +typedef int (*join_thread_cb)(pthread_t, void *); +void clients_db_destroy(join_thread_cb, void *); #endif /* SRC_CLIENTS_H_ */ diff --git a/src/main.c b/src/main.c index a26fdec5..4c129c4b 100644 --- a/src/main.c +++ b/src/main.c @@ -17,14 +17,9 @@ start_rtr_server(void) error = vrps_init(); if (error) goto just_quit; - error = clients_db_init(); - if (error) - goto revert_vrps; error = rtr_listen(); - clients_db_destroy(); -revert_vrps: vrps_destroy(); just_quit: return error; diff --git a/src/rtr/db/vrps.c b/src/rtr/db/vrps.c index b32296c2..117fcb86 100644 --- a/src/rtr/db/vrps.c +++ b/src/rtr/db/vrps.c @@ -32,11 +32,7 @@ struct state { /** ROA changes to @base over time. */ struct deltas_db deltas; - /* - * TODO (whatever) Maybe rename this. It's not really the current - * serial; it's the next one. - */ - serial_t current_serial; + serial_t next_serial; uint16_t v0_session_id; uint16_t v1_session_id; time_t last_modified_date; @@ -65,7 +61,7 @@ vrps_init(void) * "desynchronization" (more at RFC 6810 'Glossary' and * 'Fields of a PDU') */ - state.current_serial = START_SERIAL; + state.next_serial = START_SERIAL; /* Get the bits that'll fit in session_id */ state.v0_session_id = time(NULL) & 0xFFFF; @@ -153,11 +149,34 @@ __perform_standalone_validation(struct roa_table **result) return 0; } +/* + * Make an empty dummy delta array. + * It's annoying, but temporary. (Until it expires.) Otherwise, it'd be a pain + * to have to check NULL delta_group.deltas all the time. + */ +static int +create_empty_delta(struct deltas **deltas) +{ + struct delta_group deltas_node; + int error; + + error = deltas_create(deltas); + if (error) + return error; + + deltas_node.serial = state.next_serial; + deltas_node.deltas = *deltas; + error = deltas_db_add(&state.deltas, &deltas_node); + if (error) + deltas_refput(*deltas); + return error; +} + /** * Reallocate the array of @db starting at @start, the length and capacity are * calculated according to the new start. */ -static void +static int resize_deltas_db(struct deltas_db *db, struct delta_group *start) { struct delta_group *tmp, *ptr; @@ -166,38 +185,48 @@ resize_deltas_db(struct deltas_db *db, struct delta_group *start) while (db->len < db->capacity / 2) db->capacity /= 2; tmp = malloc(sizeof(struct delta_group) * db->capacity); - if (tmp == NULL) { - pr_enomem(); - return; - } + if (tmp == NULL) + return pr_enomem(); + memcpy(tmp, start, db->len * sizeof(struct delta_group)); /* Release memory allocated */ for (ptr = db->array; ptr < start; ptr++) deltas_refput(ptr->deltas); free(db->array); db->array = tmp; + + return 0; } -static void -vrps_purge(void) +/* + * Lock must be requested before calling this function + */ +static int +vrps_purge(struct deltas **deltas) { struct delta_group *group; array_index i; - uint32_t min_serial; + serial_t min_serial; - min_serial = clients_get_min_serial(); + if (clients_get_min_serial(&min_serial) != 0) { + /* Nobody will need deltas, just leave an empty one */ + deltas_refput(*deltas); + deltas_db_cleanup(&state.deltas, deltagroup_cleanup); + deltas_db_init(&state.deltas); + return create_empty_delta(deltas); + } - /* Assume is ordered by serial, so get the new initial pointer */ + /* Assume its ordered by serial, so get the new initial pointer */ ARRAYLIST_FOREACH(&state.deltas, group, i) if (group->serial >= min_serial) break; - /* Is the first element or reached end, nothing to purge */ + /* Its the first element or reached end, nothing to purge */ if (group == state.deltas.array || (group - state.deltas.array) == state.deltas.len) - return; + return 0; - resize_deltas_db(&state.deltas, group); + return resize_deltas_db(&state.deltas, group); } int @@ -207,6 +236,7 @@ vrps_update(bool *changed) struct roa_table *new_base; struct deltas *deltas; /* Deltas in raw form */ struct delta_group deltas_node; /* Deltas in database node form */ + serial_t min_serial; int error; *changed = false; @@ -237,12 +267,15 @@ vrps_update(bool *changed) goto revert_deltas; /* error == 0 is good */ } - deltas_node.serial = state.current_serial; - deltas_node.deltas = deltas; - error = deltas_db_add(&state.deltas, &deltas_node); - if (error) { - rwlock_unlock(&lock); - goto revert_deltas; + /* Just store deltas if someone will care about it */ + if (clients_get_min_serial(&min_serial) == 0) { + deltas_node.serial = state.next_serial; + deltas_node.deltas = deltas; + error = deltas_db_add(&state.deltas, &deltas_node); + if (error) { + rwlock_unlock(&lock); + goto revert_deltas; + } } /* @@ -252,33 +285,22 @@ vrps_update(bool *changed) old_base = state.base; /* Remove unnecessary deltas */ - vrps_purge(); - - } else { - /* - * Make an empty dummy delta array. - * It's annoying, but temporary. (Until it expires.) - * Otherwise, it'd be a pain to have to check NULL - * delta_group.deltas all the time. - */ - error = deltas_create(&deltas); + error = vrps_purge(&deltas); if (error) { rwlock_unlock(&lock); goto revert_base; } - - deltas_node.serial = state.current_serial; - deltas_node.deltas = deltas; - error = deltas_db_add(&state.deltas, &deltas_node); + } else { + error = create_empty_delta(&deltas); if (error) { rwlock_unlock(&lock); - goto revert_deltas; + goto revert_base; } } *changed = true; state.base = new_base; - state.current_serial++; + state.next_serial++; rwlock_unlock(&lock); @@ -312,7 +334,7 @@ vrps_foreach_base_roa(vrp_foreach_cb cb, void *arg, serial_t *serial) if (state.base != NULL) { error = roa_table_foreach_roa(state.base, cb, arg); - *serial = state.current_serial - 1; + *serial = state.next_serial - 1; } else { error = -EAGAIN; } @@ -387,7 +409,7 @@ get_last_serial_number(serial_t *result) return error; if (state.base != NULL) - *result = state.current_serial - 1; + *result = state.next_serial - 1; else error = -EAGAIN; diff --git a/src/rtr/pdu.h b/src/rtr/pdu.h index 5d07b0f2..822ce977 100644 --- a/src/rtr/pdu.h +++ b/src/rtr/pdu.h @@ -10,11 +10,6 @@ #define RTR_V0 0 #define RTR_V1 1 -struct rtr_client { - int fd; - struct sockaddr_storage addr; -}; - /** A request from an RTR client. */ struct rtr_request { /** Raw bytes. */ diff --git a/src/rtr/rtr.c b/src/rtr/rtr.c index f184070c..22ff38c1 100644 --- a/src/rtr/rtr.c +++ b/src/rtr/rtr.c @@ -10,7 +10,6 @@ #include #include #include -#include #include "config.h" #include "clients.h" @@ -21,14 +20,12 @@ struct sigaction act; -struct thread_node { +struct thread_param { + int fd; pthread_t tid; - struct rtr_client client; - SLIST_ENTRY(thread_node) next; + struct sockaddr_storage addr; }; -SLIST_HEAD(thread_list, thread_node) threads; - static int init_addrinfo(struct addrinfo **result) { @@ -172,25 +169,30 @@ clean_request(struct rtr_request *request, const struct pdu_metadata *meta) } static void -print_close_failure(int error, struct sockaddr_storage *sockaddr) +print_close_failure(int error, int fd) { + struct sockaddr_storage sockaddr; char buffer[INET6_ADDRSTRLEN]; void *addr = NULL; char const *addr_str; - switch (sockaddr->ss_family) { + if (clients_get_addr(fd, &sockaddr) != 0) { + addr_str = "(unknown)"; + goto done; + } + switch (sockaddr.ss_family) { case AF_INET: - addr = &((struct sockaddr_in *) sockaddr)->sin_addr; + addr = &((struct sockaddr_in *) &sockaddr)->sin_addr; break; case AF_INET6: - addr = &((struct sockaddr_in6 *) sockaddr)->sin6_addr; + addr = &((struct sockaddr_in6 *) &sockaddr)->sin6_addr; break; default: addr_str = "(protocol unknown)"; goto done; } - addr_str = inet_ntop(sockaddr->ss_family, addr, buffer, + addr_str = inet_ntop(sockaddr.ss_family, addr, buffer, INET6_ADDRSTRLEN); if (addr_str == NULL) addr_str = "(unprintable address)"; @@ -199,39 +201,48 @@ done: pr_errno(error, "close() failed on socket of client %s", addr_str); } -static void * -end_client(struct rtr_client *client) +static void +end_client(int fd) { - if (close(client->fd) != 0) - print_close_failure(errno, &client->addr); - - clients_forget(client->fd); - return NULL; + if (close(fd) != 0) + print_close_failure(errno, fd); } /* * The client socket threads' entry routine. + * @arg must be released. */ static void * client_thread_cb(void *arg) { - struct rtr_client *client = arg; struct pdu_metadata const *meta; struct rtr_request request; + struct thread_param param; int error; + memcpy(¶m, arg, sizeof(param)); + free(arg); + + error = clients_add(param.fd, param.addr, param.tid); + if (error) { + close(param.fd); + return NULL; + } while (true) { /* For each PDU... */ - error = pdu_load(client->fd, &request, &meta); + error = pdu_load(param.fd, &request, &meta); if (error) break; - error = meta->handle(client->fd, &request); + error = meta->handle(param.fd, &request); clean_request(&request, meta); if (error) break; } - return end_client(client); + end_client(param.fd); + clients_forget(param.fd); + + return NULL; } /* @@ -241,7 +252,7 @@ static int handle_client_connections(int server_fd) { struct sockaddr_storage client_addr; - struct thread_node *new_thread; + struct thread_param *param; socklen_t sizeof_client_addr; int client_fd; int error; @@ -269,49 +280,26 @@ handle_client_connections(int server_fd) * So don't interrupt the thread when this happens. */ - /* - * TODO this is more complicated than it needs to be. - * We have a client hash table and a thread linked list. - * These two contain essentially the same entries. It's - * redundant. - */ - - new_thread = malloc(sizeof(struct thread_node)); - if (new_thread == NULL) { + param = malloc(sizeof(struct thread_param)); + if (param == NULL) { /* No error response PDU on memory allocation. */ - pr_err("Couldn't create thread struct"); + pr_err("Couldn't create thread parameters struct"); close(client_fd); continue; } + param->fd = client_fd; + param->addr = client_addr; - new_thread->client.fd = client_fd; - new_thread->client.addr = client_addr; - - error = clients_add(&new_thread->client); - if (error) { - /* - * Presently, clients_add() can only fail due to alloc - * failure. No error report PDU. - */ - free(new_thread); - close(client_fd); - continue; - } - - error = pthread_create(&new_thread->tid, NULL, - client_thread_cb, &new_thread->client); + error = pthread_create(¶m->tid, NULL, + client_thread_cb, param); if (error && error != EAGAIN) err_pdu_send_internal_error(client_fd); if (error) { pr_errno(error, "Could not spawn the client's thread"); - clients_forget(client_fd); - free(new_thread); close(client_fd); - continue; + free(param); } - SLIST_INSERT_HEAD(&threads, new_thread, next); - } while (true); return 0; /* Unreachable. */ @@ -341,22 +329,28 @@ init_signal_handler(void) return error; } -/* Terminates client threads as gracefully as I know how to. */ +/* + * Receive @arg to be called as a clients_foreach_cb + */ +static int +kill_client(struct client const *client, void *arg) +{ + end_client(client->fd); + /* Don't call clients_forget to avoid deadlock! */ + return 0; +} + static void -wait_threads(void) +end_clients(void) { - struct thread_node *ptr; + clients_foreach(kill_client, NULL); + /* Let the clients be deleted when clients DB is destroyed */ +} - while (!SLIST_EMPTY(&threads)) { - ptr = SLIST_FIRST(&threads); - SLIST_REMOVE_HEAD(&threads, next); - /* - * If the close fails, the thread might still be using the - * thread_param variables, so leak instead. - */ - if (close_thread(ptr->tid, "Client") == 0) - free(ptr); - } +static int +join_thread(pthread_t tid, void *arg) +{ + return close_thread(tid, "Client"); } /* @@ -375,13 +369,14 @@ rtr_listen(void) if (error) return error; - /* Server ready, start updates thread */ - error = updates_daemon_start(); + /* Server ready, start everything else */ + error = clients_db_init(); if (error) goto revert_server_socket; - /* Init global vars */ - SLIST_INIT(&threads); + error = updates_daemon_start(); + if (error) + goto revert_clients_db; error = init_signal_handler(); if (error) @@ -389,9 +384,11 @@ rtr_listen(void) error = handle_client_connections(server_fd); - wait_threads(); + end_clients(); revert_updates_daemon: updates_daemon_destroy(); +revert_clients_db: + clients_db_destroy(join_thread, NULL); revert_server_socket: close(server_fd); return error;