]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Use thread pool for RTR server/clients, validation cycles at main thread
authorpcarana <pc.moreno2099@gmail.com>
Tue, 24 Nov 2020 00:20:40 +0000 (18:20 -0600)
committerpcarana <pc.moreno2099@gmail.com>
Tue, 24 Nov 2020 00:20:40 +0000 (18:20 -0600)
+Change the previous logic: RTR server lived at the main thread and the validation cycles were run in a distinct thread. Now the validation cycles are run at the main thread, and RTR server is spawned at a new thread.
+Create internal thread pool to handle RTR server task and delete RRDP dirs tasks.
+Create thread pool to handle incoming RTR clients. One thread is utilized per client.
+Create args: 'thread-pool.server.max' (spawned threads to attend RTR clients) and 'thread-pool.validation.max' (spawned threads to run validation cycles).
+Shutdown all living client sockets when the application ends its execution.
+Rename 'updates_daemon.*' to 'validation_run.*'.

19 files changed:
src/Makefile.am
src/clients.c
src/clients.h
src/config.c
src/config.h
src/delete_dir_daemon.c
src/internal_pool.c [new file with mode: 0644]
src/internal_pool.h [new file with mode: 0644]
src/main.c
src/rtr/db/vrps.c
src/rtr/primitive_reader.c
src/rtr/rtr.c
src/thread/thread_pool.c
src/updates_daemon.h [deleted file]
src/validation_run.c [moved from src/updates_daemon.c with 58% similarity]
src/validation_run.h [new file with mode: 0644]
test/client_test.c
test/impersonator.c
test/thread_pool_test.c

index cf5380933791c6e9c3cd894581a8ba2661c8da29..e0416931e3189dfadfc8dc3853defea9651f022c 100644 (file)
@@ -19,6 +19,7 @@ fort_SOURCES += delete_dir_daemon.h delete_dir_daemon.c
 fort_SOURCES += extension.h extension.c
 fort_SOURCES += file.h file.c
 fort_SOURCES += init.h init.c
+fort_SOURCES += internal_pool.h internal_pool.c
 fort_SOURCES += json_parser.c json_parser.h
 fort_SOURCES += line_file.h line_file.c
 fort_SOURCES += log.h log.c
@@ -33,10 +34,10 @@ fort_SOURCES += sorted_array.h sorted_array.c
 fort_SOURCES += state.h state.c
 fort_SOURCES += str_token.h str_token.c
 fort_SOURCES += thread_var.h thread_var.c
-fort_SOURCES += updates_daemon.c updates_daemon.h
 fort_SOURCES += uri.h uri.c
 fort_SOURCES += json_handler.h json_handler.c
 fort_SOURCES += validation_handler.h validation_handler.c
+fort_SOURCES += validation_run.h validation_run.c
 fort_SOURCES += visited_uris.h visited_uris.c
 
 fort_SOURCES += asn1/content_info.h asn1/content_info.c
index 7288ca2b73963e10dc42cc06ce0d8a73634512c9..9ce2f3fb98ed95e25ac709e3ad5491e03706fa94 100644 (file)
@@ -32,7 +32,7 @@ clients_db_init(void)
 }
 
 static struct hashable_client *
-create_client(int fd, struct sockaddr_storage addr, pthread_t tid)
+create_client(int fd, struct sockaddr_storage addr)
 {
        struct hashable_client *client;
 
@@ -46,7 +46,6 @@ create_client(int fd, struct sockaddr_storage addr, pthread_t tid)
        client->meat.serial_number_set = false;
        client->meat.rtr_version_set = false;
        client->meat.addr = addr;
-       client->meat.tid = tid;
 
        return client;
 }
@@ -55,12 +54,12 @@ create_client(int fd, struct sockaddr_storage addr, pthread_t tid)
  * If the client whose file descriptor is @fd isn't already stored, store it.
  */
 int
-clients_add(int fd, struct sockaddr_storage addr, pthread_t tid)
+clients_add(int fd, struct sockaddr_storage addr)
 {
        struct hashable_client *new_client;
        struct hashable_client *old_client;
 
-       new_client = create_client(fd, addr, tid);
+       new_client = create_client(fd, addr);
        if (new_client == NULL)
                return pr_enomem();
 
@@ -82,26 +81,6 @@ clients_add(int fd, struct sockaddr_storage addr, pthread_t tid)
        return 0;
 }
 
-int
-clients_get_addr(int fd, struct sockaddr_storage *addr)
-{
-       struct hashable_client *client;
-       int result;
-
-       result = -ENOENT;
-       rwlock_read_lock(&lock);
-
-       HASH_FIND_INT(db.clients, &fd, client);
-       if (client != NULL) {
-               *addr = client->meat.addr;
-               result = 0;
-       }
-
-       rwlock_unlock(&lock);
-
-       return result;
-}
-
 void
 clients_update_serial(int fd, serial_t serial)
 {
@@ -193,8 +172,12 @@ clients_get_rtr_version_set(int fd, bool *is_set, uint8_t *rtr_version)
        return result;
 }
 
+/*
+ * Remove the client with ID @fd from the DB. If a @cb is set, it will be
+ * called before deleting the client from the DB.
+ */
 void
-clients_forget(int fd)
+clients_forget(int fd, clients_foreach_cb cb, void *arg)
 {
        struct hashable_client *client;
 
@@ -203,6 +186,9 @@ clients_forget(int fd)
        HASH_FIND_INT(db.clients, &fd, client);
        if (client != NULL) {
                HASH_DEL(db.clients, client);
+               /* Nothing to do at errors */
+               if (cb != NULL)
+                       cb(&client->meat, arg);
                free(client);
        }
 
@@ -231,17 +217,41 @@ clients_foreach(clients_foreach_cb cb, void *arg)
 }
 
 /*
- * Destroy the clients DB, the @join_thread_cb will be made for each thread
- * that was started by the parent process (@arg will be sent at that call).
+ * Terminate all clients and remove them from DB.
+ *
+ * @cb (with @arg) will be called before deleting the corresponding client from
+ * the DB.
+ */
+int
+clients_terminate_all(clients_foreach_cb cb, void *arg)
+{
+       struct hashable_client *node, *tmp;
+       int error;
+
+       rwlock_write_lock(&lock);
+
+       HASH_ITER(hh, db.clients, node, tmp) {
+               error = cb(&node->meat, arg);
+               if (error)
+                       break;
+               HASH_DEL(db.clients, node);
+               free(node);
+       }
+
+       rwlock_unlock(&lock);
+
+       return error;
+}
+
+/*
+ * Destroy the clients DB.
  */
 void
-clients_db_destroy(join_thread_cb cb, void *arg)
+clients_db_destroy(void)
 {
        struct hashable_client *node, *tmp;
 
        HASH_ITER(hh, db.clients, node, tmp) {
-               /* Not much to do on failure */
-               cb(node->meat.tid, arg);
                HASH_DEL(db.clients, node);
                free(node);
        }
index 18e00dd8dda6fe52031d450c14a68367fb1b4fec..b22792ab4660d4bb2bb5cd2dbd64cfdbdc879608 100644 (file)
@@ -1,7 +1,6 @@
 #ifndef SRC_CLIENTS_H_
 #define SRC_CLIENTS_H_
 
-#include <pthread.h>
 #include <stdbool.h>
 #include <netinet/in.h>
 #include "rtr/pdu.h"
@@ -10,11 +9,6 @@
 struct client {
        int fd;
        struct sockaddr_storage addr;
-       /*
-        * The join should be made when the db is cleared, so the main process
-        * should do it.
-        */
-       pthread_t tid;
 
        serial_t serial_number;
        bool serial_number_set;
@@ -25,18 +19,19 @@ struct client {
 
 int clients_db_init(void);
 
-int clients_add(int, struct sockaddr_storage, pthread_t);
+int clients_add(int, struct sockaddr_storage);
 void clients_update_serial(int, serial_t);
-void clients_forget(int);
+
 typedef int (*clients_foreach_cb)(struct client *, void *);
+void clients_forget(int, clients_foreach_cb, void *);
 int clients_foreach(clients_foreach_cb, void *);
+
 int clients_get_min_serial(serial_t *);
-int clients_get_addr(int, struct sockaddr_storage *);
 
 int clients_set_rtr_version(int, uint8_t);
 int clients_get_rtr_version_set(int, bool *, uint8_t *);
 
-typedef int (*join_thread_cb)(pthread_t, void *);
-void clients_db_destroy(join_thread_cb, void *);
+int clients_terminate_all(clients_foreach_cb, void *);
+void clients_db_destroy(void);
 
 #endif /* SRC_CLIENTS_H_ */
index d77935b344af854b50577d46e5545684456cd4af..23e90b6f781cb197e426f94766581657b85b764a 100644 (file)
@@ -74,7 +74,6 @@ struct rpki_config {
                char *port;
                /** Outstanding connections in the socket's listen queue */
                unsigned int backlog;
-
                struct {
                        /** Interval used to look for updates at VRPs location */
                        unsigned int validation;
@@ -206,6 +205,18 @@ struct rpki_config {
 
        /* HTTPS URLS from where the TALS will be fetched */
        struct init_locations init_tal_locations;
+
+       /* Thread pools for specific tasks */
+       struct {
+               /* Threads related to RTR server */
+               struct {
+                       unsigned int max;
+               } server;
+               /* Threads related to validation cycles */
+               struct {
+                       unsigned int max;
+               } validation;
+       } thread_pool;
 };
 
 static void print_usage(FILE *, bool);
@@ -754,6 +765,27 @@ static const struct option_field options[] = {
                .availability = AVAILABILITY_JSON,
        },
 
+       {
+               .id = 12000,
+               .name = "thread-pool.server.max",
+               .type = &gt_uint,
+               .offset = offsetof(struct rpki_config, thread_pool.server.max),
+               .doc = "Maximum number of active threads (one thread per RTR client) that can live at the thread pool",
+               .min = 1,
+               /* Would somebody connect more than 400 routers? */
+               .max = 400,
+       },
+       {
+               .id = 12001,
+               .name = "thread-pool.validation.max",
+               .type = &gt_uint,
+               .offset = offsetof(struct rpki_config,
+                   thread_pool.validation.max),
+               .doc = "Maximum number of active threads (one thread per TAL) that can live at the thread pool",
+               .min = 1,
+               .max = 20,
+       },
+
        { 0 },
 };
 
@@ -1037,6 +1069,11 @@ set_default_values(void)
        if (error)
                goto revert_init_locations;
 
+       /* Common scenario is to connect 1 router or a couple of them */
+       rpki_config.thread_pool.server.max = 20;
+       /* Usually 5 TALs, let a few more available */
+       rpki_config.thread_pool.validation.max = 10;
+
        return 0;
 revert_init_locations:
        free(rpki_config.validation_log.tag);
@@ -1541,6 +1578,18 @@ config_get_stale_repository_period(void)
        return rpki_config.stale_repository_period;
 }
 
+unsigned int
+config_get_thread_pool_server_max(void)
+{
+       return rpki_config.thread_pool.server.max;
+}
+
+unsigned int
+config_get_thread_pool_validation_max(void)
+{
+       return rpki_config.thread_pool.validation.max;
+}
+
 void
 config_set_rsync_enabled(bool value)
 {
index 0a8c8d642360ac6d0a49da662b80abc1b31d1296..ab8b3742baf20fc9b0978d62c2bc9c8d93299eba 100644 (file)
@@ -51,6 +51,8 @@ char const *config_get_output_roa(void);
 char const *config_get_output_bgpsec(void);
 unsigned int config_get_asn1_decode_max_stack(void);
 unsigned int config_get_stale_repository_period(void);
+unsigned int config_get_thread_pool_server_max(void);
+unsigned int config_get_thread_pool_validation_max(void);
 
 /* Logging getters */
 bool config_get_op_log_enabled(void);
index 92f25843162e135f4835676e605924457ee3c927..009f377eaf5fa7fb2bfdd138d967e7945600963e 100644 (file)
@@ -5,11 +5,11 @@
 #include <sys/stat.h>
 #include <errno.h>
 #include <ftw.h>
-#include <pthread.h>
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 #include "common.h"
+#include "internal_pool.h"
 #include "log.h"
 #include "random.h"
 
@@ -80,9 +80,8 @@ remove_from_root(void *arg)
        dirs_arr = root_arg->arr;
        len = root_arg->arr_set;
 
-       /* Release received arg, and detach thread */
+       /* Release received arg */
        free(root_arg);
-       pthread_detach(pthread_self());
 
        for (i = 0; i < len; i++) {
                error = nftw(dirs_arr[i], traverse, MAX_FD_ALLOWED,
@@ -276,7 +275,6 @@ rem_dirs_destroy(struct rem_dirs *rem_dirs)
 int
 delete_dir_daemon_start(char **roots, size_t roots_len, char const *workspace)
 {
-       pthread_t thread;
        struct rem_dirs *arg;
        int error;
 
@@ -291,12 +289,11 @@ delete_dir_daemon_start(char **roots, size_t roots_len, char const *workspace)
                return error;
        }
 
-       /* Thread arg is released at thread before being detached */
-       errno = pthread_create(&thread, NULL, remove_from_root, (void *) arg);
-       if (errno) {
+       /* Thread arg is released at thread */
+       error = internal_pool_push(remove_from_root, (void *) arg);
+       if (error) {
                rem_dirs_destroy(arg);
-               return pr_op_errno(errno,
-                   "Could not spawn the delete dir daemon thread");
+               return error;
        }
 
        return 0;
diff --git a/src/internal_pool.c b/src/internal_pool.c
new file mode 100644 (file)
index 0000000..5a0fcd4
--- /dev/null
@@ -0,0 +1,40 @@
+#include "internal_pool.h"
+
+#include <stddef.h>
+
+/*
+ * This is a basic wrapper for thread_pool functions, but the allocated pool
+ * lives at the main thread.
+ *
+ * Additional threads that must be spawned during execution (those that aren't
+ * related to the validation or server thread pool tasks) can be pushed here.
+ */
+
+#define INTERNAL_POOL_MAX 5
+
+struct thread_pool *pool;
+
+int
+internal_pool_init(void)
+{
+       int error;
+
+       pool = NULL;
+       error = thread_pool_create(INTERNAL_POOL_MAX, &pool);
+       if (error)
+               return error;
+
+       return 0;
+}
+
+int
+internal_pool_push(thread_pool_task_cb cb, void *arg)
+{
+       return thread_pool_push(pool, cb, arg);
+}
+
+void
+internal_pool_cleanup(void)
+{
+       thread_pool_destroy(pool);
+}
diff --git a/src/internal_pool.h b/src/internal_pool.h
new file mode 100644 (file)
index 0000000..77fc1fc
--- /dev/null
@@ -0,0 +1,10 @@
+#ifndef SRC_INTERNAL_POOL_H_
+#define SRC_INTERNAL_POOL_H_
+
+#include "thread/thread_pool.h"
+
+int internal_pool_init(void);
+int internal_pool_push(thread_pool_task_cb, void *);
+void internal_pool_cleanup(void);
+
+#endif /* SRC_INTERNAL_POOL_H_ */
index fd91f86b656f00523661d51641f1edbf95b2f360..c0b99feb408fcb4e98c0495d141abe694b862cc8 100644 (file)
@@ -2,6 +2,7 @@
 #include "config.h"
 #include "debug.h"
 #include "extension.h"
+#include "internal_pool.h"
 #include "nid.h"
 #include "reqs_errors.h"
 #include "thread_var.h"
@@ -68,13 +69,19 @@ __main(int argc, char **argv)
        if (error)
                goto revert_nid;
 
-       error = relax_ng_init();
+       error = internal_pool_init();
        if (error)
                goto revert_http;
 
+       error = relax_ng_init();
+       if (error)
+               goto revert_pool;
+
        error = start_rtr_server();
 
        relax_ng_cleanup();
+revert_pool:
+       internal_pool_cleanup();
 revert_http:
        http_cleanup();
 revert_nid:
index 7e89728a70369b9cf3f2ab8605434aca909c05b2..ceaa46c3f268134f8fea7e60fa73f52171a42fff 100644 (file)
@@ -86,9 +86,9 @@ vrps_init(void)
        time_t now;
        int error;
 
-       /* FIXME (now) Configure max threads for tals pool */
        pool = NULL;
-       error = thread_pool_create(10, &pool);
+       error = thread_pool_create(config_get_thread_pool_validation_max(),
+           &pool);
        if (error)
                return error;
 
index 112ac6cd9293b00d367ba206abf81cfa4dc4134e..60569bf15583b251eb76dfa6d9f44f403e7c6001 100644 (file)
@@ -17,7 +17,7 @@ static void place_null_character(rtr_char *, size_t);
  * Reads exactly @buffer_len bytes from @buffer, erroring if this goal cannot be
  * met.
  *
- * If @allow_end is true, will allow immediate EOF in place of the buffer bytes.
+ * If @allow_eof is true, will allow immediate EOF in place of the buffer bytes.
  * (Though all this really means is that the corresponding warning message will
  * not be printed, which is perfectly fine as far as the only current caller is
  * concerned.)
@@ -33,7 +33,8 @@ read_exact(int fd, unsigned char *buffer, size_t buffer_len, bool allow_eof)
        for (offset = 0; offset < buffer_len; offset += read_result) {
                read_result = read(fd, &buffer[offset], buffer_len - offset);
                if (read_result == -1)
-                       return -pr_op_errno(errno, "Client socket read interrupted");
+                       return -pr_op_errno(errno,
+                           "Client socket read interrupted");
 
                if (read_result == 0) {
                        if (!allow_eof)
index 6dd70e65cd466a27f735093568d063225e8b6e0e..105cc4e8d9f7a74610ca623689b563965ccc57e2 100644 (file)
@@ -3,7 +3,6 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <netdb.h>
-#include <pthread.h>
 #include <signal.h>
 #include <stdbool.h>
 #include <stdio.h>
 
 #include "config.h"
 #include "clients.h"
+#include "internal_pool.h"
 #include "log.h"
-#include "updates_daemon.h"
+#include "validation_run.h"
 #include "rtr/err_pdu.h"
 #include "rtr/pdu.h"
 #include "rtr/db/vrps.h"
+#include "thread/thread_pool.h"
+
+/* Constant messages regarding a client status */
+#define CL_ACCEPTED   "accepted"
+#define CL_CLOSED     "closed"
+#define CL_TERMINATED "terminated"
 
 /* Parameters for each thread that handles client connections */
 struct thread_param {
        int fd;
-       pthread_t tid;
        struct sockaddr_storage addr;
 };
 
@@ -37,6 +42,15 @@ struct fd_node {
 /* List of server sockets */
 SLIST_HEAD(server_fds, fd_node);
 
+/* Does the server needs to be stopped? */
+static volatile bool server_stop;
+
+/* Parameters for the RTR server task */
+struct rtr_task_param {
+       struct server_fds *fds;
+       struct thread_pool *pool;
+};
+
 static int
 init_addrinfo(char const *hostname, char const *service,
     struct addrinfo **result)
@@ -217,7 +231,7 @@ server_fd_add(struct server_fds *fds, char const *address, char const *service)
 }
 
 static void
-server_fd_cleanup(struct server_fds *fds)
+server_fds_destroy(struct server_fds *fds)
 {
        struct fd_node *fd;
 
@@ -227,6 +241,7 @@ server_fd_cleanup(struct server_fds *fds)
                close(fd->id);
                free(fd);
        }
+       free(fds);
 }
 
 static int
@@ -298,20 +313,17 @@ create_server_sockets(struct server_fds *fds)
                error = parse_address(addresses->array[i], default_service,
                    &address, &service);
                if (error)
-                       goto cleanup_fds;
+                       return error;
 
                error = server_fd_add(fds, address, service);
                /* Always release them */
                free(address);
                free(service);
                if (error)
-                       goto cleanup_fds;
+                       return error;
        }
 
        return 0;
-cleanup_fds:
-       server_fd_cleanup(fds);
-       return error;
 }
 
 enum verdict {
@@ -378,25 +390,16 @@ clean_request(struct rtr_request *request, const struct pdu_metadata *meta)
        meta->destructor(request->pdu);
 }
 
-static void
-print_close_failure(int error, int fd)
+static int
+print_close_failure(int error, struct sockaddr_storage *sockaddr)
 {
-       struct sockaddr_storage sockaddr;
        char buffer[INET6_ADDRSTRLEN];
        char const *addr_str;
 
-       addr_str = (clients_get_addr(fd, &sockaddr) == 0)
-           ? sockaddr2str(&sockaddr, buffer)
-           : "(unknown)";
+       addr_str = sockaddr2str(sockaddr, buffer);
 
-       pr_op_errno(error, "close() failed on socket of client %s", addr_str);
-}
-
-static void
-end_client(int fd)
-{
-       if (close(fd) != 0)
-               print_close_failure(errno, fd);
+       return pr_op_errno(error, "close() failed on socket of client %s",
+           addr_str);
 }
 
 static void
@@ -407,6 +410,19 @@ print_client_addr(struct sockaddr_storage *addr, char const *action, int fd)
            sockaddr2str(addr, buffer));
 }
 
+static int
+end_client(struct client *client, void *arg)
+{
+       if (arg != NULL && strcmp(arg, CL_TERMINATED) == 0)
+               shutdown(client->fd, SHUT_RDWR);
+
+       if (close(client->fd) != 0)
+               return print_close_failure(errno, &client->addr);
+
+       print_client_addr(&(client->addr), arg, client->fd);
+       return 0;
+}
+
 /*
  * The client socket threads' entry routine.
  * @arg must be released.
@@ -422,7 +438,7 @@ client_thread_cb(void *arg)
        memcpy(&param, arg, sizeof(param));
        free(arg);
 
-       error = clients_add(param.fd, param.addr, param.tid);
+       error = clients_add(param.fd, param.addr);
        if (error) {
                close(param.fd);
                return NULL;
@@ -439,12 +455,7 @@ client_thread_cb(void *arg)
                        break;
        }
 
-       print_client_addr(&param.addr, "closed", param.fd);
-       end_client(param.fd);
-       clients_forget(param.fd);
-
-       /* Release to avoid the wait till the parent tries to join */
-       pthread_detach(param.tid);
+       clients_forget(param.fd, end_client, CL_CLOSED);
 
        return NULL;
 }
@@ -454,20 +465,20 @@ init_fdset(struct server_fds *fds, fd_set *fdset)
 {
        struct fd_node *node;
 
-       FD_ZERO (fdset);
+       FD_ZERO(fdset);
        SLIST_FOREACH(node, fds, next)
-               FD_SET (node->id, fdset);
-
+               FD_SET(node->id, fdset);
 }
 
 /*
  * Waits for client connections and spawns threads to handle them.
  */
-static int
-handle_client_connections(struct server_fds *fds)
+static void *
+handle_client_connections(void *arg)
 {
-       struct fd_node *head_node;
-       struct sigaction ign;
+       struct rtr_task_param *rtr_param = arg;
+       struct server_fds *fds;
+       struct thread_pool *pool;
        struct sockaddr_storage client_addr;
        struct thread_param *param;
        struct timeval select_time;
@@ -478,30 +489,28 @@ handle_client_connections(struct server_fds *fds)
        int fd;
        int error;
 
-       /* Ignore SIGPIPES, they're handled apart */
-       ign.sa_handler = SIG_IGN;
-       ign.sa_flags = 0;
-       sigemptyset(&ign.sa_mask);
-       sigaction(SIGPIPE, &ign, NULL);
-
-       head_node = SLIST_FIRST(fds);
-       last_server_fd = head_node->id;
+       /* Get the argument pointers, and release arg at once */
+       fds = rtr_param->fds;
+       pool = rtr_param->pool;
+       free(rtr_param);
 
-       SLIST_FOREACH(head_node, fds, next) {
-               error = listen(head_node->id, config_get_server_queue());
-               if (error)
-                       return pr_op_errno(errno,
-                           "Couldn't listen on server socket.");
-       }
+       last_server_fd = SLIST_FIRST(fds)->id;
 
        sizeof_client_addr = sizeof(client_addr);
 
+       /* I'm alive! */
+       server_stop = false;
+
        pr_op_debug("Waiting for client connections at server...");
        do {
                /* Look for connections every .2 seconds*/
                select_time.tv_sec = 0;
                select_time.tv_usec = 200000;
 
+               /* Am I still alive? */
+               if (server_stop)
+                       break;
+
                init_fdset(fds, &readfds);
 
                if (select(last_server_fd + 1, &readfds, NULL, NULL,
@@ -523,10 +532,10 @@ handle_client_connections(struct server_fds *fds)
                        case VERDICT_RETRY:
                                continue;
                        case VERDICT_EXIT:
-                               return -EINVAL;
+                               return NULL;
                        }
 
-                       print_client_addr(&client_addr, "accepted", client_fd);
+                       print_client_addr(&client_addr, CL_ACCEPTED, client_fd);
 
                        /*
                         * Note: My gut says that errors from now on (even the
@@ -545,46 +554,56 @@ handle_client_connections(struct server_fds *fds)
                        param->fd = client_fd;
                        param->addr = client_addr;
 
-                       error = pthread_create(&param->tid, NULL,
-                           client_thread_cb, param);
-                       if (error && error != EAGAIN)
+                       error = thread_pool_push(pool, client_thread_cb,
+                           param);
+                       if (error) {
+                               pr_op_err("Couldn't push a thread to attend incoming RTR client");
                                /* Error with min RTR version */
                                err_pdu_send_internal_error(client_fd, RTR_V0);
-                       if (error) {
-                               pr_op_errno(error,
-                                   "Could not spawn the client's thread");
                                close(client_fd);
                                free(param);
                        }
                }
        } while (true);
 
-       return 0; /* Unreachable. */
+       return NULL; /* Unreachable. */
 }
 
-/*
- * Receive @arg to be called as a clients_foreach_cb
- */
 static int
-kill_client(struct client *client, void *arg)
+__handle_client_connections(struct server_fds *fds, struct thread_pool *pool)
 {
-       end_client(client->fd);
-       print_client_addr(&(client->addr), "terminated", client->fd);
-       /* Don't call clients_forget to avoid deadlock! */
-       return 0;
-}
+       struct rtr_task_param *param;
+       struct fd_node *node;
+       struct sigaction ign;
+       int error;
 
-static void
-end_clients(void)
-{
-       clients_foreach(kill_client, NULL);
-       /* Let the clients be deleted when clients DB is destroyed */
-}
+       /* Ignore SIGPIPES, they're handled apart */
+       ign.sa_handler = SIG_IGN;
+       ign.sa_flags = 0;
+       sigemptyset(&ign.sa_mask);
+       sigaction(SIGPIPE, &ign, NULL);
+
+       SLIST_FOREACH(node, fds, next) {
+               error = listen(node->id, config_get_server_queue());
+               if (error)
+                       return pr_op_errno(errno,
+                           "Couldn't listen on server socket.");
+       }
+
+       param = malloc(sizeof(struct rtr_task_param));
+       if (param == NULL)
+               return pr_enomem();
+
+       param->fds = fds;
+       param->pool = pool;
+
+       /* handle_client_connections() must release param */
+       error = internal_pool_push(handle_client_connections, param);
+       if (error) {
+               free(param);
+               return error;
+       }
 
-static int
-join_thread(pthread_t tid, void *arg)
-{
-       close_thread(tid, "Client");
        return 0;
 }
 
@@ -599,38 +618,61 @@ join_thread(pthread_t tid, void *arg)
 int
 rtr_listen(void)
 {
-       bool changed;
-       struct server_fds fds; /* "file descriptors" */
+       struct server_fds *fds; /* "file descriptors" */
+       struct thread_pool *pool;
        int error;
 
+       server_stop = true;
+
        error = clients_db_init();
        if (error)
                return error;
 
        if (config_get_mode() == STANDALONE) {
-               error = vrps_update(&changed);
-               if (error)
-                       pr_op_err("Error %d while trying to update the ROA database.",
-                           error);
+               error = validation_run_first();
                goto revert_clients_db; /* Error 0 it's ok */
        }
 
-       SLIST_INIT(&fds);
-       error = create_server_sockets(&fds);
-       if (error)
+       fds = malloc(sizeof(struct server_fds));
+       if (fds == NULL) {
+               error = pr_enomem();
                goto revert_clients_db;
+       }
+
+       SLIST_INIT(fds);
+       error = create_server_sockets(fds);
+       if (error)
+               goto revert_server_fds;
+
+       pool = NULL;
+       error = thread_pool_create(config_get_thread_pool_server_max(), &pool);
+       if (error)
+               goto revert_server_fds;
 
-       error = updates_daemon_start();
+       /* Do the first run */
+       error = validation_run_first();
        if (error)
-               goto revert_server_sockets;
+               goto revert_thread_pool;
+
+       /* Wait for connections at another thread */
+       error = __handle_client_connections(fds, pool);
+       if (error)
+               goto revert_thread_pool;
+
+       /* Keep running the validations on the main thread */
+       error = validation_run_cycle();
+
+       /* Terminate all clients */
+       clients_terminate_all(end_client, CL_TERMINATED);
 
-       error = handle_client_connections(&fds);
+       /* Stop the server (it lives on a detached thread) */
+       server_stop = true;
 
-       end_clients();
-       updates_daemon_destroy();
-revert_server_sockets:
-       server_fd_cleanup(&fds);
+revert_thread_pool:
+       thread_pool_destroy(pool);
+revert_server_fds:
+       server_fds_destroy(fds);
 revert_clients_db:
-       clients_db_destroy(join_thread, NULL);
+       clients_db_destroy();
        return error;
 }
index 4cac370ee8925226a6a1e5f19fdaa2bee3b626e9..908e87bc9c6ba05b21ce496025ba3a5eb5dd8fd3 100644 (file)
@@ -121,13 +121,13 @@ tasks_poll(void *arg)
        while (true) {
                thread_pool_lock(pool);
 
-               while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+               while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop)
                        pthread_cond_wait(&(pool->working_cond), &(pool->lock));
-               }
+
                if (pool->stop)
                        break;
 
-               /* Pop the tail */
+               /* Pull the tail */
                task = task_queue_pull(&(pool->queue));
                pool->working_count++;
                pr_op_debug("Working on task #%u", pool->working_count);
@@ -195,8 +195,7 @@ tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point)
        if (error)
                return pr_op_errno(error, "Calling pthread_attr_init()");
 
-       /* FIXME (now) Let at 2MB? */
-       /* 2MB */
+       /* Use 2MB (default in most 64 bits systems) */
        error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2);
        if (error)
                return pr_op_errno(error,
@@ -328,9 +327,10 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg)
 
        thread_pool_lock(pool);
        task_queue_push(&(pool->queue), task);
+       thread_pool_unlock(pool);
+
        /* There's work to do! */
        pthread_cond_broadcast(&(pool->working_cond));
-       thread_pool_unlock(pool);
 
        return 0;
 }
diff --git a/src/updates_daemon.h b/src/updates_daemon.h
deleted file mode 100644 (file)
index ea269e5..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-#ifndef SRC_UPDATES_DAEMON_H_
-#define SRC_UPDATES_DAEMON_H_
-
-int updates_daemon_start(void);
-void updates_daemon_destroy(void);
-
-#endif /* SRC_UPDATES_DAEMON_H_ */
similarity index 58%
rename from src/updates_daemon.c
rename to src/validation_run.c
index 7adf70f3b08dc2308e51226a2c62d09b66c53e8c..9ac9a18be690d5bcb3caa5321c98b698130ab8b8 100644 (file)
@@ -1,26 +1,39 @@
-#include "updates_daemon.h"
+#include "validation_run.h"
 
-#include <errno.h>
 #include <stdbool.h>
 #include <unistd.h>
 
-#include "common.h"
 #include "config.h"
 #include "log.h"
 #include "notify.h"
-#include "object/tal.h"
 #include "rtr/db/vrps.h"
 
-static pthread_t thread;
+/* Runs a single cycle, use at standalone mode or before running RTR server */
+int
+validation_run_first(void)
+{
+       bool upd;
+       int error;
 
-static void *
-check_vrps_updates(void *param_void)
+       upd = false;
+       error = vrps_update(&upd);
+       if (error)
+               return pr_op_err("First validation wasn't successful.");
+
+       return 0;
+}
+
+/* Run a validation cycle each 'server.interval.validation' secs */
+int
+validation_run_cycle(void)
 {
+       unsigned int validation_interval;
        bool changed;
        int error;
 
+       validation_interval = config_get_validation_interval();
        do {
-               sleep(config_get_validation_interval());
+               sleep(validation_interval);
 
                error = vrps_update(&changed);
                if (error == -EINTR)
@@ -41,29 +54,5 @@ check_vrps_updates(void *param_void)
                }
        } while (true);
 
-       return NULL;
-}
-
-int
-updates_daemon_start(void)
-{
-       bool changed;
-       int error;
-
-       error = vrps_update(&changed);
-       if (error)
-               return pr_op_err("First validation wasn't successful.");
-
-       errno = pthread_create(&thread, NULL, check_vrps_updates, NULL);
-       if (errno)
-               return -pr_op_errno(errno,
-                   "Could not spawn the update daemon thread");
-
-       return 0;
-}
-
-void
-updates_daemon_destroy(void)
-{
-       close_thread(thread, "Validation");
+       return error;
 }
diff --git a/src/validation_run.h b/src/validation_run.h
new file mode 100644 (file)
index 0000000..7506cdd
--- /dev/null
@@ -0,0 +1,7 @@
+#ifndef SRC_VALIDATION_RUN_H_
+#define SRC_VALIDATION_RUN_H_
+
+int validation_run_first(void);
+int validation_run_cycle(void);
+
+#endif /* SRC_VALIDATION_RUN_H_ */
index b626920e379ec913bdbb2e213d560f6c4a087f44..efa4a0da4a2b9968defda9c5f3fc9adccfa87cc6 100644 (file)
@@ -29,13 +29,6 @@ handle_foreach(struct client *client, void *arg)
        return 0;
 }
 
-static int
-join_threads(pthread_t tid, void *arg)
-{
-       /* Empty, since no threads are alive */
-       return 0;
-}
-
 START_TEST(basic_test)
 {
        /*
@@ -58,19 +51,19 @@ START_TEST(basic_test)
         */
 
        for (i = 0; i < 4; i++) {
-               ck_assert_int_eq(0, clients_add(1, addr, 10));
-               ck_assert_int_eq(0, clients_add(2, addr, 20));
-               ck_assert_int_eq(0, clients_add(3, addr, 30));
-               ck_assert_int_eq(0, clients_add(4, addr, 40));
+               ck_assert_int_eq(0, clients_add(1, addr));
+               ck_assert_int_eq(0, clients_add(2, addr));
+               ck_assert_int_eq(0, clients_add(3, addr));
+               ck_assert_int_eq(0, clients_add(4, addr));
        }
 
-       clients_forget(3);
+       clients_forget(3, NULL, NULL);
 
        state = 0;
        ck_assert_int_eq(0, clients_foreach(handle_foreach, &state));
        ck_assert_uint_eq(3, state);
 
-       clients_db_destroy(join_threads, NULL);
+       clients_db_destroy();
 }
 END_TEST
 
index 96fd8a515c99044762ca87182c8453d3f312d68b..a22323150542d92bf7aa8301fdbc0cc7f3b09097 100644 (file)
@@ -307,3 +307,9 @@ config_set_http_priority(unsigned int value)
 {
        http_priority = value;
 }
+
+unsigned int
+config_get_thread_pool_validation_max(void)
+{
+       return 10;
+}
index c9a3bb71bb0908359e4bf4f7ee9549d4aac4ad48..de511f5a1b462d08217d054e6f56cae2fae38aab 100644 (file)
@@ -6,32 +6,30 @@
 #include "impersonator.c"
 #include "thread/thread_pool.c"
 
-#define TOTAL_THREADS 50
-
 static void *
 thread_work(void *arg)
 {
        int *value = arg;
-       sleep(2);
        (*value) += 2;
        return NULL;
 }
 
-START_TEST(tpool_work)
+static void
+test_threads_work(unsigned int total_threads)
 {
        struct thread_pool *pool;
        int **data;
        int i;
        int error;
 
-       error = thread_pool_create(TOTAL_THREADS, &pool);
+       error = thread_pool_create(total_threads, &pool);
        ck_assert_int_eq(error, 0);
 
        /* Just a dummy array where each thread will modify one slot only */
-       data = calloc(TOTAL_THREADS, sizeof(int *));
+       data = calloc(total_threads, sizeof(int *));
        ck_assert_ptr_ne(data, NULL);
 
-       for (i = 0; i < TOTAL_THREADS; i++) {
+       for (i = 0; i < total_threads; i++) {
                data[i] = malloc(sizeof(int));
                ck_assert_ptr_ne(data[i], NULL);
                *data[i] = 0;
@@ -42,7 +40,7 @@ START_TEST(tpool_work)
        thread_pool_wait(pool);
 
        /* Every element should have been modified */
-       for (i = 0; i < TOTAL_THREADS; i++) {
+       for (i = 0; i < total_threads; i++) {
                ck_assert_int_eq(*data[i], 2);
                free(data[i]);
        }
@@ -50,18 +48,33 @@ START_TEST(tpool_work)
        free(data);
        thread_pool_destroy(pool);
 }
+
+START_TEST(tpool_single_work)
+{
+       test_threads_work(1);
+}
+END_TEST
+
+START_TEST(tpool_multiple_work)
+{
+       test_threads_work(200);
+}
 END_TEST
 
 Suite *thread_pool_suite(void)
 {
        Suite *suite;
-       TCase *work;
+       TCase *single, *multiple;
+
+       single = tcase_create("single_work");
+       tcase_add_test(multiple, tpool_single_work);
 
-       work = tcase_create("work");
-       tcase_add_test(work, tpool_work);
+       multiple = tcase_create("multiple_work");
+       tcase_add_test(multiple, tpool_multiple_work);
 
        suite = suite_create("thread_pool_test()");
-       suite_add_tcase(suite, work);
+       suite_add_tcase(suite, single);
+       suite_add_tcase(suite, multiple);
 
        return suite;
 }