From: Alberto Leiva Popper Date: Thu, 6 Mar 2025 23:38:10 +0000 (-0600) Subject: Move TAL step to thread pool X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fd961c9472277f95b9ad47bbd480eac1fd6a4cf7;p=thirdparty%2FFORT-validator.git Move TAL step to thread pool Sequential TALs meant sequential TAs. (eg. Fort would first download the AFRINIC TA, then the APNIC TA, then the ARIN TA, etc). Waste of time. Now thread pool tasks can be one of two types: TAL or RPP. This results in TA downloads being treated like any other download. --- diff --git a/src/object/certificate.c b/src/object/certificate.c index 12a8a968..fee9f9b3 100644 --- a/src/object/certificate.c +++ b/src/object/certificate.c @@ -1943,7 +1943,7 @@ retry: mft_path = cage_map_file(cage, ca->sias.rpkiManifest); map = ca->rpp.files + i; ext = map->url + strlen(map->url) - 4; if (strcmp(ext, ".cer") == 0) - queued += task_enqueue(map, ca); + queued += task_enqueue_rpp(map, ca); else if (strcmp(ext, ".roa") == 0) roa_traverse(map, ca); else if (strcmp(ext, ".gbr") == 0) diff --git a/src/object/tal.c b/src/object/tal.c index cb199627..79e2b1be 100644 --- a/src/object/tal.c +++ b/src/object/tal.c @@ -135,6 +135,18 @@ tal_get_spki(struct tal *tal, unsigned char const **buffer, size_t *len) *len = tal->spki_len; } +static int +queue_tal(char const *tal_path, void *arg) +{ + if (task_enqueue_tal(tal_path) < 1) { + pr_op_err("Could not enqueue task '%s'; abandoning validation.", + tal_path); + return -1; + } + + return 0; +} + static int validate_ta(struct tal *tal, struct cache_mapping const *ta_map) { @@ -153,7 +165,7 @@ validate_ta(struct tal *tal, struct cache_mapping const *ta_map) } static int -traverse_tal(char const *tal_path, void *arg) +traverse_tal(char const *tal_path) { struct tal tal; char **url; @@ -205,10 +217,19 @@ pick_up_work(void *arg) struct validation_task *task = NULL; while ((task = task_dequeue(task)) != NULL) { - if (certificate_traverse(task->ca) == EBUSY) { - task_requeue_dormant(task); - task = NULL; + switch (task->type) { + case VTT_RPP: + if (certificate_traverse(task->u.ca) == EBUSY) { + task_requeue_dormant(task); + task = NULL; + } + break; + case VTT_TAL: + if (traverse_tal(task->u.tal) != 0) + task_stop(); + break; } + } return NULL; @@ -218,7 +239,7 @@ int perform_standalone_validation(void) { pthread_t threads[5]; // XXX variabilize - array_index t, t2; + array_index t; int error; error = cache_prepare(); @@ -227,29 +248,26 @@ perform_standalone_validation(void) fnstack_init(); task_start(); - if (foreach_file(config_get_tal(), ".tal", true, traverse_tal, NULL)!=0) + error = foreach_file(config_get_tal(), ".tal", true, queue_tal, NULL); + if (error) goto end; for (t = 0; t < 5; t++) { error = pthread_create(&threads[t], NULL, pick_up_work, NULL); - if (error) { - pr_op_err("Could not spawn validation thread %zu: %s", + if (error) + pr_crit("pthread_create(%zu) failed: %s", t, strerror(error)); - break; - } } - if (t == 0) { - pick_up_work(NULL); - error = 0; - } else for (t2 = 0; t2 < t; t2++) { - error = pthread_join(threads[t2], NULL); + for (t = 0; t < 5; t++) { + error = pthread_join(threads[t], NULL); if (error) pr_crit("pthread_join(%zu) failed: %s", - t2, strerror(error)); + t, strerror(error)); } -end: task_stop(); +end: if (task_stop()) + error = EINVAL; /* pick_up_work(), VTT_TAL */ fnstack_cleanup(); /* * Commit even on failure, as there's no reason to throw away something diff --git a/src/task.c b/src/task.c index a4a2b758..b836f6c0 100644 --- a/src/task.c +++ b/src/task.c @@ -26,7 +26,15 @@ static pthread_cond_t awakener = PTHREAD_COND_INITIALIZER; static void task_free(struct validation_task *task) { - rpki_certificate_free(task->ca); + switch (task->type) { + case VTT_RPP: + rpki_certificate_free(task->u.ca); + break; + case VTT_TAL: + free(task->u.tal); + break; + } + free(task); } @@ -69,12 +77,18 @@ task_start(void) enabled = true; } -void +/* Returns true if the module had already been stopped. */ +bool task_stop(void) { + bool result; + mutex_lock(&lock); + result = !enabled; cleanup(); mutex_unlock(&lock); + + return result; } void @@ -84,13 +98,44 @@ task_teardown(void) pthread_cond_destroy(&awakener); } +static int +enqueue_task(struct validation_task *task) +{ + mutex_lock(&lock); + if (enabled) { + STAILQ_INSERT_TAIL(&waiting, task, lh); + task = NULL; + ntasks++; + } + mutex_unlock(&lock); + + if (task) { + task_free(task); /* Couldn't queue */ + return 0; + } + + return 1; +} + +unsigned int +task_enqueue_tal(char const *tal_path) +{ + struct validation_task *task; + + task = pmalloc(sizeof(struct validation_task)); + task->type = VTT_TAL; + task->u.tal = pstrdup(tal_path); + + return enqueue_task(task); +} + /* * Defers a task for later. * Call task_wakeup() once you've queued all your tasks. * Returns number of deferred tasks. */ unsigned int -task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent) +task_enqueue_rpp(struct cache_mapping *map, struct rpki_certificate *parent) { struct validation_task *task; struct rpki_certificate *ca; @@ -104,22 +149,10 @@ task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent) atomic_init(&ca->refcount, 1); task = pmalloc(sizeof(struct validation_task)); - task->ca = ca; - - mutex_lock(&lock); - if (enabled) { - STAILQ_INSERT_TAIL(&waiting, task, lh); - task = NULL; - ntasks++; - } - mutex_unlock(&lock); + task->type = VTT_RPP; + task->u.ca = ca; - if (task) { - task_free(task); /* Couldn't queue */ - return 0; - } - - return 1; + return enqueue_task(task); } /* Steals ownership of @task. */ @@ -198,7 +231,7 @@ task_dequeue(struct validation_task *prev) STAILQ_REMOVE_HEAD(&waiting, lh); mutex_unlock(&lock); pr_op_debug("task_dequeue(): Claimed task '%s'.", - task->ca->map.url); + task->u.ca->map.url); return task; } diff --git a/src/task.h b/src/task.h index b31251cc..f47d1498 100644 --- a/src/task.h +++ b/src/task.h @@ -6,17 +6,27 @@ #include "types/map.h" #include "object/certificate.h" +enum validation_task_type { + VTT_RPP, + VTT_TAL, +}; + struct validation_task { - struct rpki_certificate *ca; + enum validation_task_type type; + union { + char *tal; + struct rpki_certificate *ca; + } u; STAILQ_ENTRY(validation_task) lh; }; void task_setup(void); void task_start(void); -void task_stop(void); +bool task_stop(void); void task_teardown(void); -unsigned int task_enqueue(struct cache_mapping *, struct rpki_certificate *); +unsigned int task_enqueue_tal(char const *); +unsigned int task_enqueue_rpp(struct cache_mapping *, struct rpki_certificate *); void task_requeue_dormant(struct validation_task *); void task_wakeup(void); void task_wakeup_dormants(void); diff --git a/test/task_test.c b/test/task_test.c index bdd0d839..54290d0a 100644 --- a/test/task_test.c +++ b/test/task_test.c @@ -24,7 +24,7 @@ queue_1(char *mapstr) struct rpki_certificate parent = { 0 }; map.url = map.path = mapstr; - ck_assert_int_eq(1, task_enqueue(&map, &parent)); + ck_assert_int_eq(1, task_enqueue_rpp(&map, &parent)); } static struct validation_task * @@ -32,7 +32,7 @@ dequeue_1(char *mapstr, struct validation_task *prev) { struct validation_task *task; task = task_dequeue(prev); - ck_assert_str_eq(mapstr, task->ca->map.url); + ck_assert_str_eq(mapstr, task->u.ca->map.url); return task; } @@ -204,12 +204,12 @@ user_thread(void *arg) printf("th%d: Started.\n", thid); while ((task = task_dequeue(task)) != NULL) { - printf("- th%d: Dequeued '%s'\n", thid, task->ca->map.url); + printf("- th%d: Dequeued '%s'\n", thid, task->u.ca->map.url); total_dequeued++; - if (certificate_traverse_mock(task->ca, thid) == EBUSY) { + if (certificate_traverse_mock(task->u.ca, thid) == EBUSY) { printf("+ th%d: Requeuing '%s'\n", - thid, task->ca->map.url); + thid, task->u.ca->map.url); task_requeue_dormant(task); task = NULL; }