From: pcarana Date: Tue, 24 Nov 2020 00:20:40 +0000 (-0600) Subject: Use thread pool for RTR server/clients, validation cycles at main thread X-Git-Tag: v1.5.0~13 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=88f75a85a7c30fc906f47872e9b6acdaee3170de;p=thirdparty%2FFORT-validator.git Use thread pool for RTR server/clients, validation cycles at main thread +Change the previous logic: RTR server lived at the main thread and the validation cycles were run in a distinct thread. Now the validation cycles are run at the main thread, and RTR server is spawned at a new thread. +Create internal thread pool to handle RTR server task and delete RRDP dirs tasks. +Create thread pool to handle incoming RTR clients. One thread is utilized per client. +Create args: 'thread-pool.server.max' (spawned threads to attend RTR clients) and 'thread-pool.validation.max' (spawned threads to run validation cycles). +Shutdown all living client sockets when the application ends its execution. +Rename 'updates_daemon.*' to 'validation_run.*'. --- diff --git a/src/Makefile.am b/src/Makefile.am index cf538093..e0416931 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -19,6 +19,7 @@ fort_SOURCES += delete_dir_daemon.h delete_dir_daemon.c fort_SOURCES += extension.h extension.c fort_SOURCES += file.h file.c fort_SOURCES += init.h init.c +fort_SOURCES += internal_pool.h internal_pool.c fort_SOURCES += json_parser.c json_parser.h fort_SOURCES += line_file.h line_file.c fort_SOURCES += log.h log.c @@ -33,10 +34,10 @@ fort_SOURCES += sorted_array.h sorted_array.c fort_SOURCES += state.h state.c fort_SOURCES += str_token.h str_token.c fort_SOURCES += thread_var.h thread_var.c -fort_SOURCES += updates_daemon.c updates_daemon.h fort_SOURCES += uri.h uri.c fort_SOURCES += json_handler.h json_handler.c fort_SOURCES += validation_handler.h validation_handler.c +fort_SOURCES += validation_run.h validation_run.c fort_SOURCES += visited_uris.h visited_uris.c fort_SOURCES += asn1/content_info.h asn1/content_info.c diff --git a/src/clients.c b/src/clients.c index 7288ca2b..9ce2f3fb 100644 --- a/src/clients.c +++ b/src/clients.c @@ -32,7 +32,7 @@ clients_db_init(void) } static struct hashable_client * -create_client(int fd, struct sockaddr_storage addr, pthread_t tid) +create_client(int fd, struct sockaddr_storage addr) { struct hashable_client *client; @@ -46,7 +46,6 @@ create_client(int fd, struct sockaddr_storage addr, pthread_t tid) client->meat.serial_number_set = false; client->meat.rtr_version_set = false; client->meat.addr = addr; - client->meat.tid = tid; return client; } @@ -55,12 +54,12 @@ create_client(int fd, struct sockaddr_storage addr, pthread_t tid) * If the client whose file descriptor is @fd isn't already stored, store it. */ int -clients_add(int fd, struct sockaddr_storage addr, pthread_t tid) +clients_add(int fd, struct sockaddr_storage addr) { struct hashable_client *new_client; struct hashable_client *old_client; - new_client = create_client(fd, addr, tid); + new_client = create_client(fd, addr); if (new_client == NULL) return pr_enomem(); @@ -82,26 +81,6 @@ clients_add(int fd, struct sockaddr_storage addr, pthread_t tid) return 0; } -int -clients_get_addr(int fd, struct sockaddr_storage *addr) -{ - struct hashable_client *client; - int result; - - result = -ENOENT; - rwlock_read_lock(&lock); - - HASH_FIND_INT(db.clients, &fd, client); - if (client != NULL) { - *addr = client->meat.addr; - result = 0; - } - - rwlock_unlock(&lock); - - return result; -} - void clients_update_serial(int fd, serial_t serial) { @@ -193,8 +172,12 @@ clients_get_rtr_version_set(int fd, bool *is_set, uint8_t *rtr_version) return result; } +/* + * Remove the client with ID @fd from the DB. If a @cb is set, it will be + * called before deleting the client from the DB. + */ void -clients_forget(int fd) +clients_forget(int fd, clients_foreach_cb cb, void *arg) { struct hashable_client *client; @@ -203,6 +186,9 @@ clients_forget(int fd) HASH_FIND_INT(db.clients, &fd, client); if (client != NULL) { HASH_DEL(db.clients, client); + /* Nothing to do at errors */ + if (cb != NULL) + cb(&client->meat, arg); free(client); } @@ -231,17 +217,41 @@ clients_foreach(clients_foreach_cb cb, void *arg) } /* - * Destroy the clients DB, the @join_thread_cb will be made for each thread - * that was started by the parent process (@arg will be sent at that call). + * Terminate all clients and remove them from DB. + * + * @cb (with @arg) will be called before deleting the corresponding client from + * the DB. + */ +int +clients_terminate_all(clients_foreach_cb cb, void *arg) +{ + struct hashable_client *node, *tmp; + int error; + + rwlock_write_lock(&lock); + + HASH_ITER(hh, db.clients, node, tmp) { + error = cb(&node->meat, arg); + if (error) + break; + HASH_DEL(db.clients, node); + free(node); + } + + rwlock_unlock(&lock); + + return error; +} + +/* + * Destroy the clients DB. */ void -clients_db_destroy(join_thread_cb cb, void *arg) +clients_db_destroy(void) { struct hashable_client *node, *tmp; HASH_ITER(hh, db.clients, node, tmp) { - /* Not much to do on failure */ - cb(node->meat.tid, arg); HASH_DEL(db.clients, node); free(node); } diff --git a/src/clients.h b/src/clients.h index 18e00dd8..b22792ab 100644 --- a/src/clients.h +++ b/src/clients.h @@ -1,7 +1,6 @@ #ifndef SRC_CLIENTS_H_ #define SRC_CLIENTS_H_ -#include #include #include #include "rtr/pdu.h" @@ -10,11 +9,6 @@ struct client { int fd; struct sockaddr_storage addr; - /* - * The join should be made when the db is cleared, so the main process - * should do it. - */ - pthread_t tid; serial_t serial_number; bool serial_number_set; @@ -25,18 +19,19 @@ struct client { int clients_db_init(void); -int clients_add(int, struct sockaddr_storage, pthread_t); +int clients_add(int, struct sockaddr_storage); void clients_update_serial(int, serial_t); -void clients_forget(int); + typedef int (*clients_foreach_cb)(struct client *, void *); +void clients_forget(int, clients_foreach_cb, void *); int clients_foreach(clients_foreach_cb, void *); + int clients_get_min_serial(serial_t *); -int clients_get_addr(int, struct sockaddr_storage *); int clients_set_rtr_version(int, uint8_t); int clients_get_rtr_version_set(int, bool *, uint8_t *); -typedef int (*join_thread_cb)(pthread_t, void *); -void clients_db_destroy(join_thread_cb, void *); +int clients_terminate_all(clients_foreach_cb, void *); +void clients_db_destroy(void); #endif /* SRC_CLIENTS_H_ */ diff --git a/src/config.c b/src/config.c index d77935b3..23e90b6f 100644 --- a/src/config.c +++ b/src/config.c @@ -74,7 +74,6 @@ struct rpki_config { char *port; /** Outstanding connections in the socket's listen queue */ unsigned int backlog; - struct { /** Interval used to look for updates at VRPs location */ unsigned int validation; @@ -206,6 +205,18 @@ struct rpki_config { /* HTTPS URLS from where the TALS will be fetched */ struct init_locations init_tal_locations; + + /* Thread pools for specific tasks */ + struct { + /* Threads related to RTR server */ + struct { + unsigned int max; + } server; + /* Threads related to validation cycles */ + struct { + unsigned int max; + } validation; + } thread_pool; }; static void print_usage(FILE *, bool); @@ -754,6 +765,27 @@ static const struct option_field options[] = { .availability = AVAILABILITY_JSON, }, + { + .id = 12000, + .name = "thread-pool.server.max", + .type = >_uint, + .offset = offsetof(struct rpki_config, thread_pool.server.max), + .doc = "Maximum number of active threads (one thread per RTR client) that can live at the thread pool", + .min = 1, + /* Would somebody connect more than 400 routers? */ + .max = 400, + }, + { + .id = 12001, + .name = "thread-pool.validation.max", + .type = >_uint, + .offset = offsetof(struct rpki_config, + thread_pool.validation.max), + .doc = "Maximum number of active threads (one thread per TAL) that can live at the thread pool", + .min = 1, + .max = 20, + }, + { 0 }, }; @@ -1037,6 +1069,11 @@ set_default_values(void) if (error) goto revert_init_locations; + /* Common scenario is to connect 1 router or a couple of them */ + rpki_config.thread_pool.server.max = 20; + /* Usually 5 TALs, let a few more available */ + rpki_config.thread_pool.validation.max = 10; + return 0; revert_init_locations: free(rpki_config.validation_log.tag); @@ -1541,6 +1578,18 @@ config_get_stale_repository_period(void) return rpki_config.stale_repository_period; } +unsigned int +config_get_thread_pool_server_max(void) +{ + return rpki_config.thread_pool.server.max; +} + +unsigned int +config_get_thread_pool_validation_max(void) +{ + return rpki_config.thread_pool.validation.max; +} + void config_set_rsync_enabled(bool value) { diff --git a/src/config.h b/src/config.h index 0a8c8d64..ab8b3742 100644 --- a/src/config.h +++ b/src/config.h @@ -51,6 +51,8 @@ char const *config_get_output_roa(void); char const *config_get_output_bgpsec(void); unsigned int config_get_asn1_decode_max_stack(void); unsigned int config_get_stale_repository_period(void); +unsigned int config_get_thread_pool_server_max(void); +unsigned int config_get_thread_pool_validation_max(void); /* Logging getters */ bool config_get_op_log_enabled(void); diff --git a/src/delete_dir_daemon.c b/src/delete_dir_daemon.c index 92f25843..009f377e 100644 --- a/src/delete_dir_daemon.c +++ b/src/delete_dir_daemon.c @@ -5,11 +5,11 @@ #include #include #include -#include #include #include #include #include "common.h" +#include "internal_pool.h" #include "log.h" #include "random.h" @@ -80,9 +80,8 @@ remove_from_root(void *arg) dirs_arr = root_arg->arr; len = root_arg->arr_set; - /* Release received arg, and detach thread */ + /* Release received arg */ free(root_arg); - pthread_detach(pthread_self()); for (i = 0; i < len; i++) { error = nftw(dirs_arr[i], traverse, MAX_FD_ALLOWED, @@ -276,7 +275,6 @@ rem_dirs_destroy(struct rem_dirs *rem_dirs) int delete_dir_daemon_start(char **roots, size_t roots_len, char const *workspace) { - pthread_t thread; struct rem_dirs *arg; int error; @@ -291,12 +289,11 @@ delete_dir_daemon_start(char **roots, size_t roots_len, char const *workspace) return error; } - /* Thread arg is released at thread before being detached */ - errno = pthread_create(&thread, NULL, remove_from_root, (void *) arg); - if (errno) { + /* Thread arg is released at thread */ + error = internal_pool_push(remove_from_root, (void *) arg); + if (error) { rem_dirs_destroy(arg); - return pr_op_errno(errno, - "Could not spawn the delete dir daemon thread"); + return error; } return 0; diff --git a/src/internal_pool.c b/src/internal_pool.c new file mode 100644 index 00000000..5a0fcd4d --- /dev/null +++ b/src/internal_pool.c @@ -0,0 +1,40 @@ +#include "internal_pool.h" + +#include + +/* + * This is a basic wrapper for thread_pool functions, but the allocated pool + * lives at the main thread. + * + * Additional threads that must be spawned during execution (those that aren't + * related to the validation or server thread pool tasks) can be pushed here. + */ + +#define INTERNAL_POOL_MAX 5 + +struct thread_pool *pool; + +int +internal_pool_init(void) +{ + int error; + + pool = NULL; + error = thread_pool_create(INTERNAL_POOL_MAX, &pool); + if (error) + return error; + + return 0; +} + +int +internal_pool_push(thread_pool_task_cb cb, void *arg) +{ + return thread_pool_push(pool, cb, arg); +} + +void +internal_pool_cleanup(void) +{ + thread_pool_destroy(pool); +} diff --git a/src/internal_pool.h b/src/internal_pool.h new file mode 100644 index 00000000..77fc1fcb --- /dev/null +++ b/src/internal_pool.h @@ -0,0 +1,10 @@ +#ifndef SRC_INTERNAL_POOL_H_ +#define SRC_INTERNAL_POOL_H_ + +#include "thread/thread_pool.h" + +int internal_pool_init(void); +int internal_pool_push(thread_pool_task_cb, void *); +void internal_pool_cleanup(void); + +#endif /* SRC_INTERNAL_POOL_H_ */ diff --git a/src/main.c b/src/main.c index fd91f86b..c0b99feb 100644 --- a/src/main.c +++ b/src/main.c @@ -2,6 +2,7 @@ #include "config.h" #include "debug.h" #include "extension.h" +#include "internal_pool.h" #include "nid.h" #include "reqs_errors.h" #include "thread_var.h" @@ -68,13 +69,19 @@ __main(int argc, char **argv) if (error) goto revert_nid; - error = relax_ng_init(); + error = internal_pool_init(); if (error) goto revert_http; + error = relax_ng_init(); + if (error) + goto revert_pool; + error = start_rtr_server(); relax_ng_cleanup(); +revert_pool: + internal_pool_cleanup(); revert_http: http_cleanup(); revert_nid: diff --git a/src/rtr/db/vrps.c b/src/rtr/db/vrps.c index 7e89728a..ceaa46c3 100644 --- a/src/rtr/db/vrps.c +++ b/src/rtr/db/vrps.c @@ -86,9 +86,9 @@ vrps_init(void) time_t now; int error; - /* FIXME (now) Configure max threads for tals pool */ pool = NULL; - error = thread_pool_create(10, &pool); + error = thread_pool_create(config_get_thread_pool_validation_max(), + &pool); if (error) return error; diff --git a/src/rtr/primitive_reader.c b/src/rtr/primitive_reader.c index 112ac6cd..60569bf1 100644 --- a/src/rtr/primitive_reader.c +++ b/src/rtr/primitive_reader.c @@ -17,7 +17,7 @@ static void place_null_character(rtr_char *, size_t); * Reads exactly @buffer_len bytes from @buffer, erroring if this goal cannot be * met. * - * If @allow_end is true, will allow immediate EOF in place of the buffer bytes. + * If @allow_eof is true, will allow immediate EOF in place of the buffer bytes. * (Though all this really means is that the corresponding warning message will * not be printed, which is perfectly fine as far as the only current caller is * concerned.) @@ -33,7 +33,8 @@ read_exact(int fd, unsigned char *buffer, size_t buffer_len, bool allow_eof) for (offset = 0; offset < buffer_len; offset += read_result) { read_result = read(fd, &buffer[offset], buffer_len - offset); if (read_result == -1) - return -pr_op_errno(errno, "Client socket read interrupted"); + return -pr_op_errno(errno, + "Client socket read interrupted"); if (read_result == 0) { if (!allow_eof) diff --git a/src/rtr/rtr.c b/src/rtr/rtr.c index 6dd70e65..105cc4e8 100644 --- a/src/rtr/rtr.c +++ b/src/rtr/rtr.c @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -15,16 +14,22 @@ #include "config.h" #include "clients.h" +#include "internal_pool.h" #include "log.h" -#include "updates_daemon.h" +#include "validation_run.h" #include "rtr/err_pdu.h" #include "rtr/pdu.h" #include "rtr/db/vrps.h" +#include "thread/thread_pool.h" + +/* Constant messages regarding a client status */ +#define CL_ACCEPTED "accepted" +#define CL_CLOSED "closed" +#define CL_TERMINATED "terminated" /* Parameters for each thread that handles client connections */ struct thread_param { int fd; - pthread_t tid; struct sockaddr_storage addr; }; @@ -37,6 +42,15 @@ struct fd_node { /* List of server sockets */ SLIST_HEAD(server_fds, fd_node); +/* Does the server needs to be stopped? */ +static volatile bool server_stop; + +/* Parameters for the RTR server task */ +struct rtr_task_param { + struct server_fds *fds; + struct thread_pool *pool; +}; + static int init_addrinfo(char const *hostname, char const *service, struct addrinfo **result) @@ -217,7 +231,7 @@ server_fd_add(struct server_fds *fds, char const *address, char const *service) } static void -server_fd_cleanup(struct server_fds *fds) +server_fds_destroy(struct server_fds *fds) { struct fd_node *fd; @@ -227,6 +241,7 @@ server_fd_cleanup(struct server_fds *fds) close(fd->id); free(fd); } + free(fds); } static int @@ -298,20 +313,17 @@ create_server_sockets(struct server_fds *fds) error = parse_address(addresses->array[i], default_service, &address, &service); if (error) - goto cleanup_fds; + return error; error = server_fd_add(fds, address, service); /* Always release them */ free(address); free(service); if (error) - goto cleanup_fds; + return error; } return 0; -cleanup_fds: - server_fd_cleanup(fds); - return error; } enum verdict { @@ -378,25 +390,16 @@ clean_request(struct rtr_request *request, const struct pdu_metadata *meta) meta->destructor(request->pdu); } -static void -print_close_failure(int error, int fd) +static int +print_close_failure(int error, struct sockaddr_storage *sockaddr) { - struct sockaddr_storage sockaddr; char buffer[INET6_ADDRSTRLEN]; char const *addr_str; - addr_str = (clients_get_addr(fd, &sockaddr) == 0) - ? sockaddr2str(&sockaddr, buffer) - : "(unknown)"; + addr_str = sockaddr2str(sockaddr, buffer); - pr_op_errno(error, "close() failed on socket of client %s", addr_str); -} - -static void -end_client(int fd) -{ - if (close(fd) != 0) - print_close_failure(errno, fd); + return pr_op_errno(error, "close() failed on socket of client %s", + addr_str); } static void @@ -407,6 +410,19 @@ print_client_addr(struct sockaddr_storage *addr, char const *action, int fd) sockaddr2str(addr, buffer)); } +static int +end_client(struct client *client, void *arg) +{ + if (arg != NULL && strcmp(arg, CL_TERMINATED) == 0) + shutdown(client->fd, SHUT_RDWR); + + if (close(client->fd) != 0) + return print_close_failure(errno, &client->addr); + + print_client_addr(&(client->addr), arg, client->fd); + return 0; +} + /* * The client socket threads' entry routine. * @arg must be released. @@ -422,7 +438,7 @@ client_thread_cb(void *arg) memcpy(¶m, arg, sizeof(param)); free(arg); - error = clients_add(param.fd, param.addr, param.tid); + error = clients_add(param.fd, param.addr); if (error) { close(param.fd); return NULL; @@ -439,12 +455,7 @@ client_thread_cb(void *arg) break; } - print_client_addr(¶m.addr, "closed", param.fd); - end_client(param.fd); - clients_forget(param.fd); - - /* Release to avoid the wait till the parent tries to join */ - pthread_detach(param.tid); + clients_forget(param.fd, end_client, CL_CLOSED); return NULL; } @@ -454,20 +465,20 @@ init_fdset(struct server_fds *fds, fd_set *fdset) { struct fd_node *node; - FD_ZERO (fdset); + FD_ZERO(fdset); SLIST_FOREACH(node, fds, next) - FD_SET (node->id, fdset); - + FD_SET(node->id, fdset); } /* * Waits for client connections and spawns threads to handle them. */ -static int -handle_client_connections(struct server_fds *fds) +static void * +handle_client_connections(void *arg) { - struct fd_node *head_node; - struct sigaction ign; + struct rtr_task_param *rtr_param = arg; + struct server_fds *fds; + struct thread_pool *pool; struct sockaddr_storage client_addr; struct thread_param *param; struct timeval select_time; @@ -478,30 +489,28 @@ handle_client_connections(struct server_fds *fds) int fd; int error; - /* Ignore SIGPIPES, they're handled apart */ - ign.sa_handler = SIG_IGN; - ign.sa_flags = 0; - sigemptyset(&ign.sa_mask); - sigaction(SIGPIPE, &ign, NULL); - - head_node = SLIST_FIRST(fds); - last_server_fd = head_node->id; + /* Get the argument pointers, and release arg at once */ + fds = rtr_param->fds; + pool = rtr_param->pool; + free(rtr_param); - SLIST_FOREACH(head_node, fds, next) { - error = listen(head_node->id, config_get_server_queue()); - if (error) - return pr_op_errno(errno, - "Couldn't listen on server socket."); - } + last_server_fd = SLIST_FIRST(fds)->id; sizeof_client_addr = sizeof(client_addr); + /* I'm alive! */ + server_stop = false; + pr_op_debug("Waiting for client connections at server..."); do { /* Look for connections every .2 seconds*/ select_time.tv_sec = 0; select_time.tv_usec = 200000; + /* Am I still alive? */ + if (server_stop) + break; + init_fdset(fds, &readfds); if (select(last_server_fd + 1, &readfds, NULL, NULL, @@ -523,10 +532,10 @@ handle_client_connections(struct server_fds *fds) case VERDICT_RETRY: continue; case VERDICT_EXIT: - return -EINVAL; + return NULL; } - print_client_addr(&client_addr, "accepted", client_fd); + print_client_addr(&client_addr, CL_ACCEPTED, client_fd); /* * Note: My gut says that errors from now on (even the @@ -545,46 +554,56 @@ handle_client_connections(struct server_fds *fds) param->fd = client_fd; param->addr = client_addr; - error = pthread_create(¶m->tid, NULL, - client_thread_cb, param); - if (error && error != EAGAIN) + error = thread_pool_push(pool, client_thread_cb, + param); + if (error) { + pr_op_err("Couldn't push a thread to attend incoming RTR client"); /* Error with min RTR version */ err_pdu_send_internal_error(client_fd, RTR_V0); - if (error) { - pr_op_errno(error, - "Could not spawn the client's thread"); close(client_fd); free(param); } } } while (true); - return 0; /* Unreachable. */ + return NULL; /* Unreachable. */ } -/* - * Receive @arg to be called as a clients_foreach_cb - */ static int -kill_client(struct client *client, void *arg) +__handle_client_connections(struct server_fds *fds, struct thread_pool *pool) { - end_client(client->fd); - print_client_addr(&(client->addr), "terminated", client->fd); - /* Don't call clients_forget to avoid deadlock! */ - return 0; -} + struct rtr_task_param *param; + struct fd_node *node; + struct sigaction ign; + int error; -static void -end_clients(void) -{ - clients_foreach(kill_client, NULL); - /* Let the clients be deleted when clients DB is destroyed */ -} + /* Ignore SIGPIPES, they're handled apart */ + ign.sa_handler = SIG_IGN; + ign.sa_flags = 0; + sigemptyset(&ign.sa_mask); + sigaction(SIGPIPE, &ign, NULL); + + SLIST_FOREACH(node, fds, next) { + error = listen(node->id, config_get_server_queue()); + if (error) + return pr_op_errno(errno, + "Couldn't listen on server socket."); + } + + param = malloc(sizeof(struct rtr_task_param)); + if (param == NULL) + return pr_enomem(); + + param->fds = fds; + param->pool = pool; + + /* handle_client_connections() must release param */ + error = internal_pool_push(handle_client_connections, param); + if (error) { + free(param); + return error; + } -static int -join_thread(pthread_t tid, void *arg) -{ - close_thread(tid, "Client"); return 0; } @@ -599,38 +618,61 @@ join_thread(pthread_t tid, void *arg) int rtr_listen(void) { - bool changed; - struct server_fds fds; /* "file descriptors" */ + struct server_fds *fds; /* "file descriptors" */ + struct thread_pool *pool; int error; + server_stop = true; + error = clients_db_init(); if (error) return error; if (config_get_mode() == STANDALONE) { - error = vrps_update(&changed); - if (error) - pr_op_err("Error %d while trying to update the ROA database.", - error); + error = validation_run_first(); goto revert_clients_db; /* Error 0 it's ok */ } - SLIST_INIT(&fds); - error = create_server_sockets(&fds); - if (error) + fds = malloc(sizeof(struct server_fds)); + if (fds == NULL) { + error = pr_enomem(); goto revert_clients_db; + } + + SLIST_INIT(fds); + error = create_server_sockets(fds); + if (error) + goto revert_server_fds; + + pool = NULL; + error = thread_pool_create(config_get_thread_pool_server_max(), &pool); + if (error) + goto revert_server_fds; - error = updates_daemon_start(); + /* Do the first run */ + error = validation_run_first(); if (error) - goto revert_server_sockets; + goto revert_thread_pool; + + /* Wait for connections at another thread */ + error = __handle_client_connections(fds, pool); + if (error) + goto revert_thread_pool; + + /* Keep running the validations on the main thread */ + error = validation_run_cycle(); + + /* Terminate all clients */ + clients_terminate_all(end_client, CL_TERMINATED); - error = handle_client_connections(&fds); + /* Stop the server (it lives on a detached thread) */ + server_stop = true; - end_clients(); - updates_daemon_destroy(); -revert_server_sockets: - server_fd_cleanup(&fds); +revert_thread_pool: + thread_pool_destroy(pool); +revert_server_fds: + server_fds_destroy(fds); revert_clients_db: - clients_db_destroy(join_thread, NULL); + clients_db_destroy(); return error; } diff --git a/src/thread/thread_pool.c b/src/thread/thread_pool.c index 4cac370e..908e87bc 100644 --- a/src/thread/thread_pool.c +++ b/src/thread/thread_pool.c @@ -121,13 +121,13 @@ tasks_poll(void *arg) while (true) { thread_pool_lock(pool); - while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) { + while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) pthread_cond_wait(&(pool->working_cond), &(pool->lock)); - } + if (pool->stop) break; - /* Pop the tail */ + /* Pull the tail */ task = task_queue_pull(&(pool->queue)); pool->working_count++; pr_op_debug("Working on task #%u", pool->working_count); @@ -195,8 +195,7 @@ tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point) if (error) return pr_op_errno(error, "Calling pthread_attr_init()"); - /* FIXME (now) Let at 2MB? */ - /* 2MB */ + /* Use 2MB (default in most 64 bits systems) */ error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2); if (error) return pr_op_errno(error, @@ -328,9 +327,10 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg) thread_pool_lock(pool); task_queue_push(&(pool->queue), task); + thread_pool_unlock(pool); + /* There's work to do! */ pthread_cond_broadcast(&(pool->working_cond)); - thread_pool_unlock(pool); return 0; } diff --git a/src/updates_daemon.h b/src/updates_daemon.h deleted file mode 100644 index ea269e5b..00000000 --- a/src/updates_daemon.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef SRC_UPDATES_DAEMON_H_ -#define SRC_UPDATES_DAEMON_H_ - -int updates_daemon_start(void); -void updates_daemon_destroy(void); - -#endif /* SRC_UPDATES_DAEMON_H_ */ diff --git a/src/updates_daemon.c b/src/validation_run.c similarity index 58% rename from src/updates_daemon.c rename to src/validation_run.c index 7adf70f3..9ac9a18b 100644 --- a/src/updates_daemon.c +++ b/src/validation_run.c @@ -1,26 +1,39 @@ -#include "updates_daemon.h" +#include "validation_run.h" -#include #include #include -#include "common.h" #include "config.h" #include "log.h" #include "notify.h" -#include "object/tal.h" #include "rtr/db/vrps.h" -static pthread_t thread; +/* Runs a single cycle, use at standalone mode or before running RTR server */ +int +validation_run_first(void) +{ + bool upd; + int error; -static void * -check_vrps_updates(void *param_void) + upd = false; + error = vrps_update(&upd); + if (error) + return pr_op_err("First validation wasn't successful."); + + return 0; +} + +/* Run a validation cycle each 'server.interval.validation' secs */ +int +validation_run_cycle(void) { + unsigned int validation_interval; bool changed; int error; + validation_interval = config_get_validation_interval(); do { - sleep(config_get_validation_interval()); + sleep(validation_interval); error = vrps_update(&changed); if (error == -EINTR) @@ -41,29 +54,5 @@ check_vrps_updates(void *param_void) } } while (true); - return NULL; -} - -int -updates_daemon_start(void) -{ - bool changed; - int error; - - error = vrps_update(&changed); - if (error) - return pr_op_err("First validation wasn't successful."); - - errno = pthread_create(&thread, NULL, check_vrps_updates, NULL); - if (errno) - return -pr_op_errno(errno, - "Could not spawn the update daemon thread"); - - return 0; -} - -void -updates_daemon_destroy(void) -{ - close_thread(thread, "Validation"); + return error; } diff --git a/src/validation_run.h b/src/validation_run.h new file mode 100644 index 00000000..7506cdd9 --- /dev/null +++ b/src/validation_run.h @@ -0,0 +1,7 @@ +#ifndef SRC_VALIDATION_RUN_H_ +#define SRC_VALIDATION_RUN_H_ + +int validation_run_first(void); +int validation_run_cycle(void); + +#endif /* SRC_VALIDATION_RUN_H_ */ diff --git a/test/client_test.c b/test/client_test.c index b626920e..efa4a0da 100644 --- a/test/client_test.c +++ b/test/client_test.c @@ -29,13 +29,6 @@ handle_foreach(struct client *client, void *arg) return 0; } -static int -join_threads(pthread_t tid, void *arg) -{ - /* Empty, since no threads are alive */ - return 0; -} - START_TEST(basic_test) { /* @@ -58,19 +51,19 @@ START_TEST(basic_test) */ for (i = 0; i < 4; i++) { - ck_assert_int_eq(0, clients_add(1, addr, 10)); - ck_assert_int_eq(0, clients_add(2, addr, 20)); - ck_assert_int_eq(0, clients_add(3, addr, 30)); - ck_assert_int_eq(0, clients_add(4, addr, 40)); + ck_assert_int_eq(0, clients_add(1, addr)); + ck_assert_int_eq(0, clients_add(2, addr)); + ck_assert_int_eq(0, clients_add(3, addr)); + ck_assert_int_eq(0, clients_add(4, addr)); } - clients_forget(3); + clients_forget(3, NULL, NULL); state = 0; ck_assert_int_eq(0, clients_foreach(handle_foreach, &state)); ck_assert_uint_eq(3, state); - clients_db_destroy(join_threads, NULL); + clients_db_destroy(); } END_TEST diff --git a/test/impersonator.c b/test/impersonator.c index 96fd8a51..a2232315 100644 --- a/test/impersonator.c +++ b/test/impersonator.c @@ -307,3 +307,9 @@ config_set_http_priority(unsigned int value) { http_priority = value; } + +unsigned int +config_get_thread_pool_validation_max(void) +{ + return 10; +} diff --git a/test/thread_pool_test.c b/test/thread_pool_test.c index c9a3bb71..de511f5a 100644 --- a/test/thread_pool_test.c +++ b/test/thread_pool_test.c @@ -6,32 +6,30 @@ #include "impersonator.c" #include "thread/thread_pool.c" -#define TOTAL_THREADS 50 - static void * thread_work(void *arg) { int *value = arg; - sleep(2); (*value) += 2; return NULL; } -START_TEST(tpool_work) +static void +test_threads_work(unsigned int total_threads) { struct thread_pool *pool; int **data; int i; int error; - error = thread_pool_create(TOTAL_THREADS, &pool); + error = thread_pool_create(total_threads, &pool); ck_assert_int_eq(error, 0); /* Just a dummy array where each thread will modify one slot only */ - data = calloc(TOTAL_THREADS, sizeof(int *)); + data = calloc(total_threads, sizeof(int *)); ck_assert_ptr_ne(data, NULL); - for (i = 0; i < TOTAL_THREADS; i++) { + for (i = 0; i < total_threads; i++) { data[i] = malloc(sizeof(int)); ck_assert_ptr_ne(data[i], NULL); *data[i] = 0; @@ -42,7 +40,7 @@ START_TEST(tpool_work) thread_pool_wait(pool); /* Every element should have been modified */ - for (i = 0; i < TOTAL_THREADS; i++) { + for (i = 0; i < total_threads; i++) { ck_assert_int_eq(*data[i], 2); free(data[i]); } @@ -50,18 +48,33 @@ START_TEST(tpool_work) free(data); thread_pool_destroy(pool); } + +START_TEST(tpool_single_work) +{ + test_threads_work(1); +} +END_TEST + +START_TEST(tpool_multiple_work) +{ + test_threads_work(200); +} END_TEST Suite *thread_pool_suite(void) { Suite *suite; - TCase *work; + TCase *single, *multiple; + + single = tcase_create("single_work"); + tcase_add_test(multiple, tpool_single_work); - work = tcase_create("work"); - tcase_add_test(work, tpool_work); + multiple = tcase_create("multiple_work"); + tcase_add_test(multiple, tpool_multiple_work); suite = suite_create("thread_pool_test()"); - suite_add_tcase(suite, work); + suite_add_tcase(suite, single); + suite_add_tcase(suite, multiple); return suite; }