Sequential TALs meant sequential TAs.
(eg. Fort would first download the AFRINIC TA, then the APNIC TA, then
the ARIN TA, etc).
Waste of time.
Now thread pool tasks can be one of two types: TAL or RPP.
This results in TA downloads being treated like any other download.
map = ca->rpp.files + i;
ext = map->url + strlen(map->url) - 4;
if (strcmp(ext, ".cer") == 0)
- queued += task_enqueue(map, ca);
+ queued += task_enqueue_rpp(map, ca);
else if (strcmp(ext, ".roa") == 0)
roa_traverse(map, ca);
else if (strcmp(ext, ".gbr") == 0)
*len = tal->spki_len;
}
+static int
+queue_tal(char const *tal_path, void *arg)
+{
+ if (task_enqueue_tal(tal_path) < 1) {
+ pr_op_err("Could not enqueue task '%s'; abandoning validation.",
+ tal_path);
+ return -1;
+ }
+
+ return 0;
+}
+
static int
validate_ta(struct tal *tal, struct cache_mapping const *ta_map)
{
}
static int
-traverse_tal(char const *tal_path, void *arg)
+traverse_tal(char const *tal_path)
{
struct tal tal;
char **url;
struct validation_task *task = NULL;
while ((task = task_dequeue(task)) != NULL) {
- if (certificate_traverse(task->ca) == EBUSY) {
- task_requeue_dormant(task);
- task = NULL;
+ switch (task->type) {
+ case VTT_RPP:
+ if (certificate_traverse(task->u.ca) == EBUSY) {
+ task_requeue_dormant(task);
+ task = NULL;
+ }
+ break;
+ case VTT_TAL:
+ if (traverse_tal(task->u.tal) != 0)
+ task_stop();
+ break;
}
+
}
return NULL;
perform_standalone_validation(void)
{
pthread_t threads[5]; // XXX variabilize
- array_index t, t2;
+ array_index t;
int error;
error = cache_prepare();
fnstack_init();
task_start();
- if (foreach_file(config_get_tal(), ".tal", true, traverse_tal, NULL)!=0)
+ error = foreach_file(config_get_tal(), ".tal", true, queue_tal, NULL);
+ if (error)
goto end;
for (t = 0; t < 5; t++) {
error = pthread_create(&threads[t], NULL, pick_up_work, NULL);
- if (error) {
- pr_op_err("Could not spawn validation thread %zu: %s",
+ if (error)
+ pr_crit("pthread_create(%zu) failed: %s",
t, strerror(error));
- break;
- }
}
- if (t == 0) {
- pick_up_work(NULL);
- error = 0;
- } else for (t2 = 0; t2 < t; t2++) {
- error = pthread_join(threads[t2], NULL);
+ for (t = 0; t < 5; t++) {
+ error = pthread_join(threads[t], NULL);
if (error)
pr_crit("pthread_join(%zu) failed: %s",
- t2, strerror(error));
+ t, strerror(error));
}
-end: task_stop();
+end: if (task_stop())
+ error = EINVAL; /* pick_up_work(), VTT_TAL */
fnstack_cleanup();
/*
* Commit even on failure, as there's no reason to throw away something
static void
task_free(struct validation_task *task)
{
- rpki_certificate_free(task->ca);
+ switch (task->type) {
+ case VTT_RPP:
+ rpki_certificate_free(task->u.ca);
+ break;
+ case VTT_TAL:
+ free(task->u.tal);
+ break;
+ }
+
free(task);
}
enabled = true;
}
-void
+/* Returns true if the module had already been stopped. */
+bool
task_stop(void)
{
+ bool result;
+
mutex_lock(&lock);
+ result = !enabled;
cleanup();
mutex_unlock(&lock);
+
+ return result;
}
void
pthread_cond_destroy(&awakener);
}
+static int
+enqueue_task(struct validation_task *task)
+{
+ mutex_lock(&lock);
+ if (enabled) {
+ STAILQ_INSERT_TAIL(&waiting, task, lh);
+ task = NULL;
+ ntasks++;
+ }
+ mutex_unlock(&lock);
+
+ if (task) {
+ task_free(task); /* Couldn't queue */
+ return 0;
+ }
+
+ return 1;
+}
+
+unsigned int
+task_enqueue_tal(char const *tal_path)
+{
+ struct validation_task *task;
+
+ task = pmalloc(sizeof(struct validation_task));
+ task->type = VTT_TAL;
+ task->u.tal = pstrdup(tal_path);
+
+ return enqueue_task(task);
+}
+
/*
* Defers a task for later.
* Call task_wakeup() once you've queued all your tasks.
* Returns number of deferred tasks.
*/
unsigned int
-task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent)
+task_enqueue_rpp(struct cache_mapping *map, struct rpki_certificate *parent)
{
struct validation_task *task;
struct rpki_certificate *ca;
atomic_init(&ca->refcount, 1);
task = pmalloc(sizeof(struct validation_task));
- task->ca = ca;
-
- mutex_lock(&lock);
- if (enabled) {
- STAILQ_INSERT_TAIL(&waiting, task, lh);
- task = NULL;
- ntasks++;
- }
- mutex_unlock(&lock);
+ task->type = VTT_RPP;
+ task->u.ca = ca;
- if (task) {
- task_free(task); /* Couldn't queue */
- return 0;
- }
-
- return 1;
+ return enqueue_task(task);
}
/* Steals ownership of @task. */
STAILQ_REMOVE_HEAD(&waiting, lh);
mutex_unlock(&lock);
pr_op_debug("task_dequeue(): Claimed task '%s'.",
- task->ca->map.url);
+ task->u.ca->map.url);
return task;
}
#include "types/map.h"
#include "object/certificate.h"
+enum validation_task_type {
+ VTT_RPP,
+ VTT_TAL,
+};
+
struct validation_task {
- struct rpki_certificate *ca;
+ enum validation_task_type type;
+ union {
+ char *tal;
+ struct rpki_certificate *ca;
+ } u;
STAILQ_ENTRY(validation_task) lh;
};
void task_setup(void);
void task_start(void);
-void task_stop(void);
+bool task_stop(void);
void task_teardown(void);
-unsigned int task_enqueue(struct cache_mapping *, struct rpki_certificate *);
+unsigned int task_enqueue_tal(char const *);
+unsigned int task_enqueue_rpp(struct cache_mapping *, struct rpki_certificate *);
void task_requeue_dormant(struct validation_task *);
void task_wakeup(void);
void task_wakeup_dormants(void);
struct rpki_certificate parent = { 0 };
map.url = map.path = mapstr;
- ck_assert_int_eq(1, task_enqueue(&map, &parent));
+ ck_assert_int_eq(1, task_enqueue_rpp(&map, &parent));
}
static struct validation_task *
{
struct validation_task *task;
task = task_dequeue(prev);
- ck_assert_str_eq(mapstr, task->ca->map.url);
+ ck_assert_str_eq(mapstr, task->u.ca->map.url);
return task;
}
printf("th%d: Started.\n", thid);
while ((task = task_dequeue(task)) != NULL) {
- printf("- th%d: Dequeued '%s'\n", thid, task->ca->map.url);
+ printf("- th%d: Dequeued '%s'\n", thid, task->u.ca->map.url);
total_dequeued++;
- if (certificate_traverse_mock(task->ca, thid) == EBUSY) {
+ if (certificate_traverse_mock(task->u.ca, thid) == EBUSY) {
printf("+ th%d: Requeuing '%s'\n",
- thid, task->ca->map.url);
+ thid, task->u.ca->map.url);
task_requeue_dormant(task);
task = NULL;
}