]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Move TAL step to thread pool
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Thu, 6 Mar 2025 23:38:10 +0000 (17:38 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Thu, 6 Mar 2025 23:38:10 +0000 (17:38 -0600)
Sequential TALs meant sequential TAs.
(eg. Fort would first download the AFRINIC TA, then the APNIC TA, then
the ARIN TA, etc).
Waste of time.

Now thread pool tasks can be one of two types: TAL or RPP.
This results in TA downloads being treated like any other download.

src/object/certificate.c
src/object/tal.c
src/task.c
src/task.h
test/task_test.c

index 12a8a9682d0e39e00023a684076d08c922ccaf49..fee9f9b3fb1fd984ecb8b1ab988452648ea62551 100644 (file)
@@ -1943,7 +1943,7 @@ retry:    mft_path = cage_map_file(cage, ca->sias.rpkiManifest);
                map = ca->rpp.files + i;
                ext = map->url + strlen(map->url) - 4;
                if (strcmp(ext, ".cer") == 0)
-                       queued += task_enqueue(map, ca);
+                       queued += task_enqueue_rpp(map, ca);
                else if (strcmp(ext, ".roa") == 0)
                        roa_traverse(map, ca);
                else if (strcmp(ext, ".gbr") == 0)
index cb199627a9f475e0ac1c4b02b9352664447f2552..79e2b1bec5d5de8a25489e0c5262a59ed6635658 100644 (file)
@@ -135,6 +135,18 @@ tal_get_spki(struct tal *tal, unsigned char const **buffer, size_t *len)
        *len = tal->spki_len;
 }
 
+static int
+queue_tal(char const *tal_path, void *arg)
+{
+       if (task_enqueue_tal(tal_path) < 1) {
+               pr_op_err("Could not enqueue task '%s'; abandoning validation.",
+                   tal_path);
+               return -1;
+       }
+
+       return 0;
+}
+
 static int
 validate_ta(struct tal *tal, struct cache_mapping const *ta_map)
 {
@@ -153,7 +165,7 @@ validate_ta(struct tal *tal, struct cache_mapping const *ta_map)
 }
 
 static int
-traverse_tal(char const *tal_path, void *arg)
+traverse_tal(char const *tal_path)
 {
        struct tal tal;
        char **url;
@@ -205,10 +217,19 @@ pick_up_work(void *arg)
        struct validation_task *task = NULL;
 
        while ((task = task_dequeue(task)) != NULL) {
-               if (certificate_traverse(task->ca) == EBUSY) {
-                       task_requeue_dormant(task);
-                       task = NULL;
+               switch (task->type) {
+               case VTT_RPP:
+                       if (certificate_traverse(task->u.ca) == EBUSY) {
+                               task_requeue_dormant(task);
+                               task = NULL;
+                       }
+                       break;
+               case VTT_TAL:
+                       if (traverse_tal(task->u.tal) != 0)
+                               task_stop();
+                       break;
                }
+
        }
 
        return NULL;
@@ -218,7 +239,7 @@ int
 perform_standalone_validation(void)
 {
        pthread_t threads[5]; // XXX variabilize
-       array_index t, t2;
+       array_index t;
        int error;
 
        error = cache_prepare();
@@ -227,29 +248,26 @@ perform_standalone_validation(void)
        fnstack_init();
        task_start();
 
-       if (foreach_file(config_get_tal(), ".tal", true, traverse_tal, NULL)!=0)
+       error = foreach_file(config_get_tal(), ".tal", true, queue_tal, NULL);
+       if (error)
                goto end;
 
        for (t = 0; t < 5; t++) {
                error = pthread_create(&threads[t], NULL, pick_up_work, NULL);
-               if (error) {
-                       pr_op_err("Could not spawn validation thread %zu: %s",
+               if (error)
+                       pr_crit("pthread_create(%zu) failed: %s",
                            t, strerror(error));
-                       break;
-               }
        }
 
-       if (t == 0) {
-               pick_up_work(NULL);
-               error = 0;
-       } else for (t2 = 0; t2 < t; t2++) {
-               error = pthread_join(threads[t2], NULL);
+       for (t = 0; t < 5; t++) {
+               error = pthread_join(threads[t], NULL);
                if (error)
                        pr_crit("pthread_join(%zu) failed: %s",
-                           t2, strerror(error));
+                           t, strerror(error));
        }
 
-end:   task_stop();
+end:   if (task_stop())
+               error = EINVAL; /* pick_up_work(), VTT_TAL */
        fnstack_cleanup();
        /*
         * Commit even on failure, as there's no reason to throw away something
index a4a2b758c308efe823e80f449499380aab660447..b836f6c0e77c783428481a9d309be486ecfac7c5 100644 (file)
@@ -26,7 +26,15 @@ static pthread_cond_t awakener = PTHREAD_COND_INITIALIZER;
 static void
 task_free(struct validation_task *task)
 {
-       rpki_certificate_free(task->ca);
+       switch (task->type) {
+       case VTT_RPP:
+               rpki_certificate_free(task->u.ca);
+               break;
+       case VTT_TAL:
+               free(task->u.tal);
+               break;
+       }
+
        free(task);
 }
 
@@ -69,12 +77,18 @@ task_start(void)
        enabled = true;
 }
 
-void
+/* Returns true if the module had already been stopped. */
+bool
 task_stop(void)
 {
+       bool result;
+
        mutex_lock(&lock);
+       result = !enabled;
        cleanup();
        mutex_unlock(&lock);
+
+       return result;
 }
 
 void
@@ -84,13 +98,44 @@ task_teardown(void)
        pthread_cond_destroy(&awakener);
 }
 
+static int
+enqueue_task(struct validation_task *task)
+{
+       mutex_lock(&lock);
+       if (enabled) {
+               STAILQ_INSERT_TAIL(&waiting, task, lh);
+               task = NULL;
+               ntasks++;
+       }
+       mutex_unlock(&lock);
+
+       if (task) {
+               task_free(task); /* Couldn't queue */
+               return 0;
+       }
+
+       return 1;
+}
+
+unsigned int
+task_enqueue_tal(char const *tal_path)
+{
+       struct validation_task *task;
+
+       task = pmalloc(sizeof(struct validation_task));
+       task->type = VTT_TAL;
+       task->u.tal = pstrdup(tal_path);
+
+       return enqueue_task(task);
+}
+
 /*
  * Defers a task for later.
  * Call task_wakeup() once you've queued all your tasks.
  * Returns number of deferred tasks.
  */
 unsigned int
-task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent)
+task_enqueue_rpp(struct cache_mapping *map, struct rpki_certificate *parent)
 {
        struct validation_task *task;
        struct rpki_certificate *ca;
@@ -104,22 +149,10 @@ task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent)
        atomic_init(&ca->refcount, 1);
 
        task = pmalloc(sizeof(struct validation_task));
-       task->ca = ca;
-
-       mutex_lock(&lock);
-       if (enabled) {
-               STAILQ_INSERT_TAIL(&waiting, task, lh);
-               task = NULL;
-               ntasks++;
-       }
-       mutex_unlock(&lock);
+       task->type = VTT_RPP;
+       task->u.ca = ca;
 
-       if (task) {
-               task_free(task); /* Couldn't queue */
-               return 0;
-       }
-
-       return 1;
+       return enqueue_task(task);
 }
 
 /* Steals ownership of @task. */
@@ -198,7 +231,7 @@ task_dequeue(struct validation_task *prev)
                        STAILQ_REMOVE_HEAD(&waiting, lh);
                        mutex_unlock(&lock);
                        pr_op_debug("task_dequeue(): Claimed task '%s'.",
-                           task->ca->map.url);
+                           task->u.ca->map.url);
                        return task;
                }
 
index b31251cc34ace11e016d01c3650ef1d696160b28..f47d14986c508c708abac49aca43a96df46bfc4e 100644 (file)
@@ -6,17 +6,27 @@
 #include "types/map.h"
 #include "object/certificate.h"
 
+enum validation_task_type {
+       VTT_RPP,
+       VTT_TAL,
+};
+
 struct validation_task {
-       struct rpki_certificate *ca;
+       enum validation_task_type type;
+       union {
+               char *tal;
+               struct rpki_certificate *ca;
+       } u;
        STAILQ_ENTRY(validation_task) lh;
 };
 
 void task_setup(void);
 void task_start(void);
-void task_stop(void);
+bool task_stop(void);
 void task_teardown(void);
 
-unsigned int task_enqueue(struct cache_mapping *, struct rpki_certificate *);
+unsigned int task_enqueue_tal(char const *);
+unsigned int task_enqueue_rpp(struct cache_mapping *, struct rpki_certificate *);
 void task_requeue_dormant(struct validation_task *);
 void task_wakeup(void);
 void task_wakeup_dormants(void);
index bdd0d839c4b4e28e43494027ec02ae9150d65b36..54290d0add35ce317b78fb7f60a3ef70099a5484 100644 (file)
@@ -24,7 +24,7 @@ queue_1(char *mapstr)
        struct rpki_certificate parent = { 0 };
 
        map.url = map.path = mapstr;
-       ck_assert_int_eq(1, task_enqueue(&map, &parent));
+       ck_assert_int_eq(1, task_enqueue_rpp(&map, &parent));
 }
 
 static struct validation_task *
@@ -32,7 +32,7 @@ dequeue_1(char *mapstr, struct validation_task *prev)
 {
        struct validation_task *task;
        task = task_dequeue(prev);
-       ck_assert_str_eq(mapstr, task->ca->map.url);
+       ck_assert_str_eq(mapstr, task->u.ca->map.url);
        return task;
 }
 
@@ -204,12 +204,12 @@ user_thread(void *arg)
        printf("th%d: Started.\n", thid);
 
        while ((task = task_dequeue(task)) != NULL) {
-               printf("- th%d: Dequeued '%s'\n", thid, task->ca->map.url);
+               printf("- th%d: Dequeued '%s'\n", thid, task->u.ca->map.url);
                total_dequeued++;
 
-               if (certificate_traverse_mock(task->ca, thid) == EBUSY) {
+               if (certificate_traverse_mock(task->u.ca, thid) == EBUSY) {
                        printf("+ th%d: Requeuing '%s'\n",
-                           thid, task->ca->map.url);
+                           thid, task->u.ca->map.url);
                        task_requeue_dormant(task);
                        task = NULL;
                }