]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Fix rsync and thread pool bugs. 1.5.0 v1.5.0
authorpcarana <pc.moreno2099@gmail.com>
Wed, 27 Jan 2021 15:32:18 +0000 (09:32 -0600)
committerpcarana <pc.moreno2099@gmail.com>
Wed, 27 Jan 2021 15:32:18 +0000 (09:32 -0600)
+Mistakenly (of course, it was a bug) the returned value from rsync execution was being confused with the returned value from execvp call. The main problem was when rsync returned a code 12 (Error in rsync protocol data stream); in that case, the caller confused that error with ENOMEM (also with value 12), which led to terminate execution.
+The thread pool wait function wasn't considering pending taks at the queue; also the poll function was holding and releasing the mutex more than it was needed, and the thread attributes are now globally initialized (thanks @ydahhrk for the code review).
+Increment the number of threads at the internal pool to 10.

src/internal_pool.c
src/rsync/rsync.c
src/thread/thread_pool.c

index 5a0fcd4df94260b9d8f4ae067a16d830dbad44b9..513994269122eb9908249d0ecc914cd46dbd98dc 100644 (file)
@@ -10,7 +10,7 @@
  * related to the validation or server thread pool tasks) can be pushed here.
  */
 
-#define INTERNAL_POOL_MAX 5
+#define INTERNAL_POOL_MAX 10
 
 struct thread_pool *pool;
 
index 6332192d309bd135f07908a2e1c5dc47d33810f3..ed7dadbd7c722f369b0c522d24cd01a2c4afd046 100644 (file)
@@ -258,7 +258,7 @@ handle_child_thread(char **args, int fds[2][2])
            strerror(error));
 
        /* https://stackoverflow.com/a/14493459/1735458 */
-       exit(error);
+       exit(-error);
 }
 
 static int
@@ -445,8 +445,9 @@ do_rsync(struct rpki_uri *uri, bool is_ta, bool log_operation)
                if (WIFEXITED(child_status)) {
                        /* Happy path (but also sad path sometimes). */
                        error = WEXITSTATUS(child_status);
-                       pr_val_debug("Child terminated with error code %d.", error);
-                       if (error == ENOMEM)
+                       pr_val_debug("Child terminated with error code %d.",
+                           error);
+                       if (error == -ENOMEM)
                                pr_enomem();
 
                        if (!error)
index 1612237b8dd3594945539eeb2663ba6dd2339b95..7a5e19255bc9d814283dec7272ab58b1148ae57d 100644 (file)
@@ -88,6 +88,7 @@ task_queue_pull(struct task_queue *queue)
 
        tmp = TAILQ_LAST(queue, task_queue);
        TAILQ_REMOVE(queue, tmp, next);
+       pr_op_debug("Pulling a task from the pool");
 
        return tmp;
 }
@@ -97,6 +98,7 @@ static void
 task_queue_push(struct task_queue *queue, struct task *task)
 {
        TAILQ_INSERT_HEAD(queue, task, next);
+       pr_op_debug("Pushing a task to the pool");
 }
 
 /*
@@ -115,13 +117,12 @@ tasks_poll(void *arg)
        /* The thread has started, send the signal */
        thread_pool_lock(pool);
        pthread_cond_signal(&(pool->waiting_cond));
-       thread_pool_unlock(pool);
 
        while (true) {
-               thread_pool_lock(pool);
-
-               while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop)
+               while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+                       pr_op_debug("Thread waiting for work...");
                        pthread_cond_wait(&(pool->working_cond), &(pool->lock));
+               }
 
                if (pool->stop)
                        break;
@@ -144,8 +145,6 @@ tasks_poll(void *arg)
                if (!pool->stop && pool->working_count == 0 &&
                    TAILQ_EMPTY(&(pool->queue)))
                        pthread_cond_signal(&(pool->waiting_cond));
-
-               thread_pool_unlock(pool);
        }
 
        /* The thread will cease to exist */
@@ -173,52 +172,61 @@ thread_pool_thread_wait_start(struct thread_pool *pool)
        clock_gettime(CLOCK_REALTIME, &tmout);
        tmout.tv_sec += 2;
 
+       thread_pool_lock(pool);
        error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
            &tmout);
-       if (error)
+       if (error) {
+               thread_pool_unlock(pool);
                return pr_op_errno(error, "Waiting thread to start");
+       }
+       thread_pool_unlock(pool);
 
        return 0;
 }
 
 static int
-tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point)
+thread_pool_attr_create(pthread_attr_t *attr)
 {
-       pthread_attr_t attr;
-       pthread_t thread_id;
        int error;
 
-       memset(&thread_id, 0, sizeof(pthread_t));
-
-       error = pthread_attr_init(&attr);
+       error = pthread_attr_init(attr);
        if (error)
                return pr_op_errno(error, "Calling pthread_attr_init()");
 
        /* Use 2MB (default in most 64 bits systems) */
-       error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2);
-       if (error)
+       error = pthread_attr_setstacksize(attr, 1024 * 1024 * 2);
+       if (error) {
+               pthread_attr_destroy(attr);
                return pr_op_errno(error,
                    "Calling pthread_attr_setstacksize()");
+       }
 
-       error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-       if (error)
+       error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
+       if (error) {
+               pthread_attr_destroy(attr);
                return pr_op_errno(error,
                    "Calling pthread_attr_setdetachstate()");
+       }
 
-       thread_pool_lock(pool);
-       error = pthread_create(&thread_id, &attr, entry_point, pool);
-       pthread_attr_destroy(&attr);
-       if (error) {
-               thread_pool_unlock(pool);
+       return 0;
+}
+
+static int
+tpool_thread_spawn(struct thread_pool *pool, pthread_attr_t *attr,
+    thread_pool_task_cb entry_point)
+{
+       pthread_t thread_id;
+       int error;
+
+       memset(&thread_id, 0, sizeof(pthread_t));
+
+       error = pthread_create(&thread_id, attr, entry_point, pool);
+       if (error)
                return pr_op_errno(error, "Spawning pool thread");
-       }
 
        error = thread_pool_thread_wait_start(pool);
-       if (error) {
-               thread_pool_unlock(pool);
+       if (error)
                return error;
-       }
-       thread_pool_unlock(pool);
 
        return 0;
 }
@@ -227,6 +235,7 @@ int
 thread_pool_create(unsigned int threads, struct thread_pool **pool)
 {
        struct thread_pool *tmp;
+       pthread_attr_t attr;
        unsigned int i;
        int error;
 
@@ -260,19 +269,28 @@ thread_pool_create(unsigned int threads, struct thread_pool **pool)
        TAILQ_INIT(&(tmp->queue));
        tmp->stop = false;
        tmp->working_count = 0;
-       tmp->thread_count = threads;
+       tmp->thread_count = 0;
+
+       error = thread_pool_attr_create(&attr);
+       if (error)
+               goto free_waiting_cond;
 
        for (i = 0; i < threads; i++) {
-               error = tpool_thread_spawn(tmp, tasks_poll);
+               error = tpool_thread_spawn(tmp, &attr, tasks_poll);
                if (error) {
+                       pthread_attr_destroy(&attr);
                        thread_pool_destroy(tmp);
                        return error;
                }
+               tmp->thread_count++;
                pr_op_debug("Pool thread #%u spawned", i);
        }
+       pthread_attr_destroy(&attr);
 
        *pool = tmp;
        return 0;
+free_waiting_cond:
+       pthread_cond_destroy(&(tmp->waiting_cond));
 free_working_cond:
        pthread_cond_destroy(&(tmp->working_cond));
 free_mutex:
@@ -326,15 +344,14 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg)
 
        thread_pool_lock(pool);
        task_queue_push(&(pool->queue), task);
-       thread_pool_unlock(pool);
-
        /* There's work to do! */
-       pthread_cond_broadcast(&(pool->working_cond));
+       pthread_cond_signal(&(pool->working_cond));
+       thread_pool_unlock(pool);
 
        return 0;
 }
 
-/* Are there available threads to work? */
+/* There are available threads to work? */
 bool
 thread_pool_avail_threads(struct thread_pool *pool)
 {
@@ -354,7 +371,13 @@ thread_pool_wait(struct thread_pool *pool)
        thread_pool_lock(pool);
        while (true) {
                pr_op_debug("Waiting all tasks from the pool to end");
-               if ((!pool->stop && pool->working_count != 0) ||
+               pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
+               pr_op_debug("- Working count: %u", pool->working_count);
+               pr_op_debug("- Thread count: %u", pool->thread_count);
+               pr_op_debug("- Empty queue: %s",
+                   TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
+               if ((!pool->stop &&
+                   (pool->working_count != 0 || !TAILQ_EMPTY(&(pool->queue)))) ||
                    (pool->stop && pool->thread_count != 0))
                        pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
                else