]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Add thread safety to the new cache
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Sat, 9 Nov 2024 01:00:05 +0000 (19:00 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Sat, 9 Nov 2024 01:02:20 +0000 (19:02 -0600)
The thread that handles the first task that needs repository A
refreshes A.

While the refresh happens, threads handling other tasks that also need
A postpone said tasks.

Such postponed tasks wake up when the refresh completes.

src/cache.c
src/cache.h
src/common.c
src/object/certificate.c
src/object/tal.c
src/rrdp.c
src/rrdp.h
src/task.c
src/task.h
test/cache_test.c
test/task_test.c

index a3e6359d63c7d4b37c982e60e33d10c334ca8935..b56c33405f6446c4bcde894cf9c367d7a06dd32f 100644 (file)
 #include "log.h"
 #include "rrdp.h"
 #include "rsync.h"
+#include "task.h"
 #include "types/array.h"
 #include "types/path.h"
 #include "types/url.h"
 #include "types/uthash.h"
 
+enum dl_state {
+       DLS_OUTDATED = 0,       /* Not downloaded yet */
+       DLS_ONGOING,            /* Download in progress */
+       DLS_FRESH,              /* Download complete */
+};
+
+/*
+ * This is a delicate structure; pay attention.
+ *
+ * During the multithreaded stage of the validation cycle, the entire cache_node
+ * (except @hh) becomes (effectively) constant when @state becomes DLS_FRESH.
+ *
+ * The cache (ie. this module) only hands the node to the validation code
+ * (through cache_cage) when @state becomes DLS_FRESH.
+ *
+ * This is intended to allow the validation code to read the remaining fields
+ * (all except @hh) without having to hold the table mutex.
+ *
+ * C cannot entirely ensure the node remains constant after it's handed outside;
+ * this must be done through careful coding and review.
+ */
 struct cache_node {
        struct cache_mapping map;
 
-       /* XXX change to boolean? */
-       int fresh;              /* Refresh already attempted? */
+       enum dl_state state;
        int dlerr;              /* Result code of recent download attempt */
        time_t mtim;            /* Last successful download time, or zero */
 
@@ -40,12 +61,20 @@ struct cache_node {
 
 typedef int (*dl_cb)(struct cache_node *rpp);
 
+/*
+ * When concurrency is at play, you need @lock to access @nodes and @seq.
+ * @name, @enabled and @download stay constant through the validation.
+ *
+ * @lock also protects the nodes' @state and @hh, which have additional rules.
+ * (See cache_node.)
+ */
 struct cache_table {
        char *name;
        bool enabled;
        struct cache_sequence seq;
        struct cache_node *nodes; /* Hash Table */
        dl_cb download;
+       pthread_mutex_t lock;
 };
 
 static struct rpki_cache {
@@ -63,8 +92,8 @@ static struct rpki_cache {
 static volatile sig_atomic_t lockfile_owned;
 
 struct cache_cage {
-       struct cache_node *refresh;
-       struct cache_node *fallback;
+       struct cache_node const *refresh;
+       struct cache_node const *fallback;
 };
 
 struct cache_commit {
@@ -75,6 +104,7 @@ struct cache_commit {
 };
 
 static STAILQ_HEAD(cache_commits, cache_commit) commits = STAILQ_HEAD_INITIALIZER(commits);
+static pthread_mutex_t commits_lock = PTHREAD_MUTEX_INITIALIZER;
 
 #define LOCKFILE ".lock"
 #define INDEX_FILE "index.json"
@@ -163,6 +193,8 @@ init_table(struct cache_table *tbl, char *name, bool enabled, dl_cb dl)
        tbl->enabled = enabled;
        cseq_init(&tbl->seq, name, false);
        tbl->download = dl;
+       panic_on_fail(pthread_mutex_init(&tbl->lock, NULL),
+           "pthread_mutex_init");
 }
 
 static void
@@ -608,6 +640,7 @@ dl_http(struct cache_node *file)
        return 0;
 }
 
+/* Caller must lock @tbl->lock */
 static struct cache_node *
 find_node(struct cache_table *tbl, char const *url, size_t urlen)
 {
@@ -640,38 +673,75 @@ provide_node(struct cache_table *tbl, char const *url)
        return node;
 }
 
-/* @uri is either a caRepository or a rpkiNotify */
-static struct cache_node *
-do_refresh(struct cache_table *tbl, char const *uri)
+/*
+ * @uri is either a caRepository or a rpkiNotify
+ * By contract, only sets @result on return 0.
+ * By contract, @result->state will be DLS_FRESH on return 0.
+ */
+static int
+do_refresh(struct cache_table *tbl, char const *uri, struct cache_node **result)
 {
        struct cache_node *node;
+       bool downloaded = false;
 
        pr_val_debug("Trying %s (online)...", uri);
 
        if (!tbl->enabled) {
                pr_val_debug("Protocol disabled.");
-               return NULL;
+               return ESRCH;
        }
 
        if (tbl == &cache.rsync) {
                char *module = get_rsync_module(uri);
                if (module == NULL)
-                       return NULL;
+                       return EINVAL;
+               mutex_lock(&tbl->lock);
                node = provide_node(tbl, module);
                free(module);
        } else {
+               mutex_lock(&tbl->lock);
                node = provide_node(tbl, uri);
        }
-       if (!node)
-               return NULL;
+       if (!node) {
+               mutex_unlock(&tbl->lock);
+               return EINVAL;
+       }
+
+       switch (node->state) {
+       case DLS_OUTDATED:
+               node->state = DLS_ONGOING;
+               mutex_unlock(&tbl->lock);
 
-       if (!node->fresh) {
-               node->fresh = true;
                node->dlerr = tbl->download(node);
+               downloaded = true;
+
+               mutex_lock(&tbl->lock);
+               node->state = DLS_FRESH;
+               break;
+       case DLS_ONGOING:
+               mutex_unlock(&tbl->lock);
+               pr_val_debug("Refresh ongoing.");
+               return EBUSY;
+       case DLS_FRESH:
+               break;
+       default:
+               pr_crit("Unknown node state: %d", node->state);
        }
 
-       pr_val_debug(node->dlerr ? "Refresh failed." : "Refresh succeeded.");
-       return node;
+       mutex_unlock(&tbl->lock);
+       /* node->state is guaranteed to be DLS_FRESH at this point. */
+
+       if (downloaded) /* Kickstart tasks that fell into DLS_ONGOING */
+               task_wakeup_busy();
+
+       if (node->dlerr != 0) {
+               pr_val_debug("Refresh failed.");
+               return node->dlerr;
+       }
+
+       pr_val_debug("Refresh succeeded.");
+       *result = node;
+       return 0;
 }
 
 static struct cache_node *
@@ -688,29 +758,32 @@ get_fallback(char const *caRepository)
 
 /* Do not free nor modify the result. */
 char *
-cache_refresh_url(char const *url)
+cache_refresh_by_url(char const *url)
 {
        struct cache_node *node = NULL;
 
-       // XXX mutex
        // XXX review result signs
        // XXX Normalize @url
 
        if (url_is_https(url))
-               node = do_refresh(&cache.https, url);
+               do_refresh(&cache.https, url, &node);
        else if (url_is_rsync(url))
-               node = do_refresh(&cache.rsync, url);
+               do_refresh(&cache.rsync, url, &node);
 
-       // XXX Maybe strdup path so the caller can't corrupt our string
-       return (node && !node->dlerr) ? node->map.path : NULL;
+       return node ? node->map.path : NULL;
 }
 
 /* Do not free nor modify the result. */
 char *
-cache_fallback_url(char const *url)
+cache_get_fallback(char const *url)
 {
        struct cache_node *node;
 
+       /*
+        * The fallback table is read-only until the cleanup.
+        * Mutex not needed here.
+        */
+
        pr_val_debug("Trying %s (offline)...", url);
 
        node = find_node(&cache.fallback, url, strlen(url));
@@ -729,44 +802,53 @@ cache_fallback_url(char const *url)
  * XXX Need to normalize the sias.
  * XXX Fallback only if parent is fallback
  */
-struct cache_cage *
-cache_refresh_sias(struct sia_uris *sias)
+int
+cache_refresh_by_sias(struct sia_uris *sias, struct cache_cage **result)
 {
-       struct cache_cage *cage;
        struct cache_node *node;
+       struct cache_cage *cage;
 
        // XXX Make sure somewhere validates rpkiManifest matches caRepository.
-       // XXX mutex
        // XXX review result signs
        // XXX normalize rpkiNotify & caRepository?
 
-       cage = pzalloc(sizeof(struct cache_cage));
-       cage->fallback = get_fallback(sias->caRepository);
-
+       /* Try RRDP + optional fallback */
        if (sias->rpkiNotify) {
-               node = do_refresh(&cache.rrdp, sias->rpkiNotify);
-               if (node && !node->dlerr) {
-                       cage->refresh = node;
-                       return cage; /* RRDP + optional fallback happy path */
+               switch (do_refresh(&cache.rrdp, sias->rpkiNotify, &node)) {
+               case 0:
+                       goto refresh_success;
+               case EBUSY:
+                       return EBUSY;
                }
        }
 
-       node = do_refresh(&cache.rsync, sias->caRepository);
-       if (node && !node->dlerr) {
-               cage->refresh = node;
-               return cage; /* rsync + optional fallback happy path */
+       /* Try rsync + optional fallback */
+       switch (do_refresh(&cache.rsync, sias->caRepository, &node)) {
+       case 0:
+               goto refresh_success;
+       case EBUSY:
+               return EBUSY;
        }
 
-       if (cage->fallback == NULL) {
-               free(cage);
-               return NULL;
-       }
+       /* Try fallback only */
+       node = get_fallback(sias->caRepository); /* XXX (test) does this catch notifies? */
+       if (!node)
+               return EINVAL; /* Nothing to work with */
 
-       return cage; /* fallback happy path */
+       *result = cage = pmalloc(sizeof(struct cache_cage));
+       cage->refresh = NULL;
+       cage->fallback = node;
+       return 0;
+
+refresh_success:
+       *result = cage = pmalloc(sizeof(struct cache_cage));
+       cage->refresh = node;
+       cage->fallback = get_fallback(sias->caRepository);
+       return 0;
 }
 
-char const *
-node2file(struct cache_node *node, char const *url)
+static char const *
+node2file(struct cache_node const *node, char const *url)
 {
        if (node == NULL)
                return NULL;
@@ -779,6 +861,12 @@ node2file(struct cache_node *node, char const *url)
 char const *
 cage_map_file(struct cache_cage *cage, char const *url)
 {
+       /*
+        * Remember: In addition to honoring the consts of cache->refresh and
+        * cache->fallback, anything these structures point to MUST NOT be
+        * modified either.
+        */
+
        char const *file;
 
        file = node2file(cage->refresh, url);
@@ -792,6 +880,12 @@ cage_map_file(struct cache_cage *cage, char const *url)
 bool
 cage_disable_refresh(struct cache_cage *cage)
 {
+       /*
+        * Remember: In addition to honoring the consts of cache->refresh and
+        * cache->fallback, anything these structures point to MUST NOT be
+        * modified either.
+        */
+
        bool enabled = (cage->refresh != NULL);
        cage->refresh = NULL;
 
@@ -822,12 +916,16 @@ cache_commit_rpp(char const *caRepository, struct rpp *rpp)
        commit->caRepository = pstrdup(caRepository);
        commit->files = rpp->files;
        commit->nfiles = rpp->nfiles;
+
+       mutex_lock(&commits_lock);
        STAILQ_INSERT_TAIL(&commits, commit, lh);
+       mutex_unlock(&commits_lock);
 
        rpp->files = NULL;
        rpp->nfiles = 0;
 }
 
+/* XXX not called */
 void
 cache_commit_file(struct cache_mapping *map)
 {
@@ -840,7 +938,10 @@ cache_commit_file(struct cache_mapping *map)
        commit->files[0].url = pstrdup(map->url);
        commit->files[0].path = pstrdup(map->path);
        commit->nfiles = 1;
+
+       mutex_lock(&commits_lock);
        STAILQ_INSERT_TAIL(&commits, commit, lh);
+       mutex_unlock(&commits_lock);
 }
 
 static void
@@ -850,10 +951,17 @@ cachent_print(struct cache_node *node)
                return;
 
        printf("\t%s (%s): ", node->map.url, node->map.path);
-       if (node->fresh)
-               printf("fresh (errcode %d)", node->dlerr);
-       else
+       switch (node->state) {
+       case DLS_OUTDATED:
                printf("stale");
+               break;
+       case DLS_ONGOING:
+               printf("downloading");
+               break;
+       case DLS_FRESH:
+               printf("fresh (errcode %d)", node->dlerr);
+               break;
+       }
        printf("\n");
 }
 
@@ -1093,7 +1201,7 @@ commit_fallbacks(void)
                                    strerror(errno));
                }
 
-freshen:       fb->fresh = 1;
+freshen:       fb->state = DLS_FRESH;
 skip:          free(commit->caRepository);
                for (i = 0; i < commit->nfiles; i++) {
                        free(commit->files[i].url);
@@ -1104,7 +1212,7 @@ skip:             free(commit->caRepository);
        }
 
        HASH_ITER(hh, cache.fallback.nodes, fb, tmp) {
-               if (fb->fresh)
+               if (fb->state == DLS_FRESH)
                        continue;
 
                /*
index 24d1ea7d55b3a178040bc31b56b6c1ae71bf9c5f..c791346a7e53e58839c51e95cb1764761b4b6d63 100644 (file)
@@ -37,11 +37,11 @@ struct sia_uris {
 void sias_init(struct sia_uris *);
 void sias_cleanup(struct sia_uris *);
 
-char *cache_refresh_url(char const *);
-char *cache_fallback_url(char const *);
+char *cache_refresh_by_url(char const *);
+char *cache_get_fallback(char const *);
 
 struct cache_cage;
-struct cache_cage *cache_refresh_sias(struct sia_uris *);
+int cache_refresh_by_sias(struct sia_uris *, struct cache_cage **);
 char const *cage_map_file(struct cache_cage *, char const *);
 bool cage_disable_refresh(struct cache_cage *);
 void cache_commit_rpp(char const *, struct rpp *);
index 1f7d7e6809a2b4282cdca3559fa4a0da83cb7673..03a5c7629032c5e0f35ed2c448795f89f2f0ecb0 100644 (file)
@@ -32,7 +32,9 @@ void
 panic_on_fail(int error, char const *function_name)
 {
        if (error)
-               pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
+               pr_crit("%s() returned error code %d. "
+                   "This is too critical for a graceful recovery; "
+                   "I must die now.",
                    function_name, error);
 }
 
index 9de21803e77eb23fdf62cc75be03e734d3ee1f1a..99fd12852e90e7100ebae98a1c17803d715080f4 100644 (file)
@@ -1900,18 +1900,25 @@ certificate_traverse(struct rpki_certificate *ca)
        unsigned int queued;
        int error;
 
-       error = certificate_validate(ca);
-       if (error)
-               return error;
+       if (!ca->x509) {
+               error = certificate_validate(ca);
+               if (error)
+                       return error;
 
-       if (ca->type != CERTYPE_TA && ca->type != CERTYPE_CA)
-               return 0;
+               if (ca->type != CERTYPE_TA && ca->type != CERTYPE_CA)
+                       return 0;
+       } /* else "we already did this, and returned EBUSY" */
 
-       cage = cache_refresh_sias(&ca->sias);
-       if (!cage)
+       switch (cache_refresh_by_sias(&ca->sias, &cage)) {
+       case 0:
+               break;
+       case EBUSY:
+               return EBUSY;
+       default:
                return pr_val_err("caRepository '%s' could not be refreshed, "
                    "and there is no fallback in the cache. "
                    "I'm going to have to skip it.", ca->sias.caRepository);
+       }
 
 retry: mft = cage_map_file(cage, ca->sias.rpkiManifest);
        if (!mft) {
index 084412ea3d5aed7938fad9ca09c6452941dd1211..5c33c99db6cf54a2e128b24eb3b343d305c4c144 100644 (file)
@@ -169,7 +169,7 @@ traverse_tal(char const *tal_path, void *arg)
        /* Online attempts */
        ARRAYLIST_FOREACH(&tal.urls, url) {
                map.url = *url;
-               map.path = cache_refresh_url(*url);
+               map.path = cache_refresh_by_url(*url);
                if (!map.path)
                        continue;
                if (validate_ta(&tal, &map) != 0)
@@ -180,7 +180,7 @@ traverse_tal(char const *tal_path, void *arg)
        /* Offline fallback attempts */
        ARRAYLIST_FOREACH(&tal.urls, url) {
                map.url = *url;
-               map.path = cache_fallback_url(*url);
+               map.path = cache_get_fallback(*url);
                if (!map.path)
                        continue;
                if (validate_ta(&tal, &map) != 0)
@@ -201,8 +201,12 @@ pick_up_work(void *arg)
 {
        struct validation_task *task = NULL;
 
-       while ((task = task_dequeue(task)) != NULL)
-               certificate_traverse(task->ca);
+       while ((task = task_dequeue(task)) != NULL) {
+               if (certificate_traverse(task->ca) == EBUSY) {
+                       task_requeue_busy(task);
+                       task = NULL;
+               }
+       }
 
        return NULL;
 }
@@ -211,7 +215,6 @@ int
 perform_standalone_validation(void)
 {
        pthread_t threads[5]; // XXX variabilize
-       unsigned int ids[5];
        array_index t, t2;
        int error;
 
@@ -225,8 +228,7 @@ perform_standalone_validation(void)
                goto end;
 
        for (t = 0; t < 5; t++) {
-               ids[t] = t;
-               error = pthread_create(&threads[t], NULL, pick_up_work, &ids[t]);
+               error = pthread_create(&threads[t], NULL, pick_up_work, NULL);
                if (error) {
                        pr_op_err("Could not spawn validation thread %zu: %s",
                            t, strerror(error));
index a42e4c6fcf1726f2a0596130eee922072276ab56..fee1de1b234e7b64e3502b7b7caceabb37ef1737 100644 (file)
@@ -143,7 +143,7 @@ session_cleanup(struct rrdp_session *meta)
 }
 
 static struct cache_file *
-state_find_file(struct rrdp_state *state, char const *url, size_t len)
+state_find_file(struct rrdp_state const *state, char const *url, size_t len)
 {
        struct cache_file *file;
        HASH_FIND(hh, state->files, url, len, file);
@@ -1276,7 +1276,7 @@ end:      fnstack_pop();
 }
 
 char const *
-rrdp_file(struct rrdp_state *state, char const *url)
+rrdp_file(struct rrdp_state const *state, char const *url)
 {
        struct cache_file *file;
        file = state_find_file(state, url, strlen(url));
index 804bbe1a051a5dc135cec7238421bfa476dd74fe..4d7126fcccf711cf1862246b8da2aadb85aa2a85 100644 (file)
@@ -12,7 +12,7 @@ struct rrdp_state;
 
 int rrdp_update(struct cache_mapping const *, time_t, bool *,
     struct cache_sequence *, struct rrdp_state **);
-char const *rrdp_file(struct rrdp_state *, char const *);
+char const *rrdp_file(struct rrdp_state const *, char const *);
 
 char const *rrdp_create_fallback(char *, struct rrdp_state **, char const *);
 
index 555f75161dc3d8e5170ae450b2719a9b6cc8af68..6fab1945d9c56f0da00fc65b2604b9a8773476ff 100644 (file)
@@ -6,9 +6,13 @@
 #include "common.h"
 #include "log.h"
 
+STAILQ_HEAD(validation_tasks, validation_task);
+
 /* Queued, not yet claimed tasks */
-static STAILQ_HEAD(validation_tasks, validation_task) tasks;
-/* Active tasks (length of @tasks plus number of running tasks) */
+static struct validation_tasks tasks;
+/* Queued, but not yet available for claiming */
+static struct validation_tasks busy;
+/* Active tasks (length(@tasks) + length(@busy) + number of running tasks) */
 static int active;
 
 static bool enabled = true;
@@ -27,6 +31,7 @@ void
 task_setup(void)
 {
        STAILQ_INIT(&tasks);
+       STAILQ_INIT(&busy);
        active = 0;
        enabled = true;
        panic_on_fail(pthread_mutex_init(&lock, NULL), "pthread_mutex_init");
@@ -34,19 +39,26 @@ task_setup(void)
 }
 
 static void
-cleanup(void)
+cleanup_tasks(struct validation_tasks *tasks)
 {
        struct validation_task *task;
 
-       enabled = false;
-       active = 0;
-       while (!STAILQ_EMPTY(&tasks)) {
-               task = STAILQ_FIRST(&tasks);
-               STAILQ_REMOVE_HEAD(&tasks, lh);
+       while (!STAILQ_EMPTY(tasks)) {
+               task = STAILQ_FIRST(tasks);
+               STAILQ_REMOVE_HEAD(tasks, lh);
                task_free(task);
        }
 }
 
+static void
+cleanup(void)
+{
+       enabled = false;
+       active = 0;
+       cleanup_tasks(&tasks);
+       cleanup_tasks(&busy);
+}
+
 void
 task_start(void)
 {
@@ -91,19 +103,35 @@ task_enqueue(struct cache_mapping *map, struct rpki_certificate *parent)
        task->ca = ca;
 
        mutex_lock(&lock);
-
        if (enabled) {
                STAILQ_INSERT_TAIL(&tasks, task, lh);
+               task = NULL;
                active++;
-       } else {
-               task_free(task);
        }
-
        mutex_unlock(&lock);
 
+       if (task) {
+               task_free(task); /* Couldn't queue */
+               return 0;
+       }
+
        return 1;
 }
 
+void
+task_requeue_busy(struct validation_task *task)
+{
+       mutex_lock(&lock);
+       if (enabled) {
+               STAILQ_INSERT_TAIL(&busy, task, lh);
+               task = NULL;
+       }
+       mutex_unlock(&lock);
+
+       if (task)
+               task_free(task); /* Couldn't queue */
+}
+
 /* Wakes up threads currently waiting for tasks. */
 void
 task_wakeup(void)
@@ -114,10 +142,21 @@ task_wakeup(void)
        mutex_unlock(&lock);
 }
 
+void
+task_wakeup_busy(void)
+{
+       mutex_lock(&lock);
+       STAILQ_CONCAT(&tasks, &busy);
+       panic_on_fail(pthread_cond_broadcast(&awakener),
+           "pthread_cond_broadcast");
+       mutex_unlock(&lock);
+}
+
 /*
  * Frees the @prev previous task, and returns the next one.
  *
- * If no task is available yet, will sleep until someone calls task_wakeup().
+ * If no task is available yet, will sleep until someone calls task_wakeup() or
+ * task_wakeup_busy().
  * If all the tasks are done, returns NULL.
  *
  * Assumes at least one task has been queued before the first dequeue.
index 155e8e416405c0a1abcaa2569f38cbd5147a27b3..356cf53f63e338f9063a301cb1f32c8169717c92 100644 (file)
@@ -17,7 +17,9 @@ void task_stop(void);
 void task_teardown(void);
 
 unsigned int task_enqueue(struct cache_mapping *, struct rpki_certificate *);
+void task_requeue_busy(struct validation_task *);
 void task_wakeup(void);
+void task_wakeup_busy(void);
 struct validation_task *task_dequeue(struct validation_task *);
 
 #endif /* SRC_TASK_H_ */
index 5ba727bca50ccb6b58eab727765268c5e3975222..6eef59e47cfe356ea2e3043792f9ec2194a44fcf 100644 (file)
@@ -48,6 +48,7 @@ rsync_download(char const *url, char const *path)
 }
 
 MOCK_VOID(__delete_node_cb, struct cache_node const *node)
+MOCK_VOID(task_wakeup_busy, void)
 
 /* Helpers */
 
@@ -61,15 +62,15 @@ setup_test(void)
 }
 
 static struct cache_cage *
-run_dl_rsync(char *caRepository, unsigned int expected_calls)
+run_dl_rsync(char *caRepository, int expected_err, unsigned int expected_calls)
 {
        struct sia_uris sias = { .caRepository = caRepository };
-       struct cache_cage *cage;
+       struct cache_cage *cage = NULL;
 
        rsync_counter = 0;
        https_counter = 0;
        printf("---- Downloading... ----\n");
-       cage = cache_refresh_sias(&sias);
+       ck_assert_int_eq(expected_err, cache_refresh_by_sias(&sias, &cage));
        printf("---- Downloaded. ----\n");
        ck_assert_uint_eq(expected_calls, rsync_counter);
        ck_assert_uint_eq(0, https_counter);
@@ -86,13 +87,13 @@ run_dl_https(char const *url, unsigned int expected_calls,
        rsync_counter = 0;
        https_counter = 0;
        printf("---- Downloading... ----\n");
-       result = cache_refresh_url(url);
+       result = cache_refresh_by_url(url);
        printf("---- Downloaded. ----\n");
        ck_assert_uint_eq(0, rsync_counter);
        ck_assert_uint_eq(expected_calls, https_counter);
 
        ck_assert_str(expected_result, result);
-       ck_assert_str(NULL, cache_fallback_url(url));
+       ck_assert_str(NULL, cache_get_fallback(url));
 }
 
 
@@ -100,7 +101,7 @@ static void
 ck_cage(struct cache_cage *cage, char const *url,
     char const *refresh, char const *fallback)
 {
-       struct cache_node *bkp;
+       struct cache_node const *bkp;
 
        ck_assert_str(refresh, cage_map_file(cage, url));
 
@@ -215,7 +216,7 @@ init_node_rsync(struct cache_node *node, char *url, char *path,
 {
        node->map.url = url;
        node->map.path = path;
-       node->fresh = fresh;
+       node->state = fresh ? DLS_FRESH : DLS_OUTDATED; /* XXX (test) */
        node->dlerr = dlerr;
        node->rrdp = NULL;
 }
@@ -226,7 +227,7 @@ init_node_https(struct cache_node *node, char *url, char *path,
 {
        node->map.url = url;
        node->map.path = path;
-       node->fresh = fresh;
+       node->state = fresh ? DLS_FRESH : DLS_OUTDATED;
        node->dlerr = dlerr;
        node->rrdp = NULL;
 }
@@ -236,7 +237,7 @@ ck_cache_node_eq(struct cache_node *expected, struct cache_node *actual)
 {
        ck_assert_str_eq(expected->map.url, actual->map.url);
        ck_assert_str_eq(expected->map.path, actual->map.path);
-       ck_assert_int_eq(expected->fresh, actual->fresh);
+       ck_assert_int_eq(expected->state, actual->state);
        ck_assert_int_eq(expected->dlerr, actual->dlerr);
        if (expected->rrdp == NULL)
                ck_assert_ptr_eq(expected->rrdp, actual->rrdp);
@@ -298,7 +299,7 @@ static time_t epoch;
 static void
 unfreshen(struct cache_table *tbl, struct cache_node *node)
 {
-       node->fresh = 0;
+       node->state = DLS_OUTDATED;
 }
 
 static int
@@ -347,7 +348,7 @@ START_TEST(test_cache_download_rsync)
        setup_test();
 
        printf("==== Startup ====\n");
-       cage = run_dl_rsync("rsync://a.b.c/d", 1);
+       cage = run_dl_rsync("rsync://a.b.c/d", 0, 1);
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
        ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
@@ -356,7 +357,7 @@ START_TEST(test_cache_download_rsync)
        free(cage);
 
        printf("==== Redownload same file, nothing should happen ====\n");
-       cage = run_dl_rsync("rsync://a.b.c/d", 0);
+       cage = run_dl_rsync("rsync://a.b.c/d", 0, 0);
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
        ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
@@ -368,7 +369,7 @@ START_TEST(test_cache_download_rsync)
         * download d, we needn't bother redownloading d/e.
         */
        printf("==== Don't redownload child ====\n");
-       cage = run_dl_rsync("rsync://a.b.c/d/e", 0);
+       cage = run_dl_rsync("rsync://a.b.c/d/e", 0, 0);
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://a.b.c/d", "rsync/0", NULL);
        ck_cage(cage, "rsync://a.b.c/d/e/f.cer", "rsync/0/e/f.cer", NULL);
@@ -382,7 +383,7 @@ START_TEST(test_cache_download_rsync)
         * and there would be consequences for violating it.
         */
        printf("==== rsync truncated ====\n");
-       cage = run_dl_rsync("rsync://x.y.z/m/n/o", 1);
+       cage = run_dl_rsync("rsync://x.y.z/m/n/o", 0, 1);
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://x.y.z/m", "rsync/1", NULL);
        ck_cage(cage, "rsync://x.y.z/m/n/o", "rsync/1/n/o", NULL);
@@ -391,7 +392,7 @@ START_TEST(test_cache_download_rsync)
        free(cage);
 
        printf("==== Sibling ====\n");
-       cage = run_dl_rsync("rsync://a.b.c/e/f", 1);
+       cage = run_dl_rsync("rsync://a.b.c/e/f", 0, 1);
        ck_assert_ptr_ne(NULL, cage);
        ck_cage(cage, "rsync://a.b.c/e", "rsync/2", NULL);
        ck_cage(cage, "rsync://a.b.c/e/f/x/y/z", "rsync/2/f/x/y/z", NULL);
@@ -414,17 +415,17 @@ START_TEST(test_cache_download_rsync_error)
 
        printf("==== Startup ====\n");
        dl_error = 0;
-       free(run_dl_rsync("rsync://a.b.c/d", 1));
+       free(run_dl_rsync("rsync://a.b.c/d", 0, 1));
        dl_error = EINVAL;
-       ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 1));
+       ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 1));
        ck_cache_rsync(nodes);
 
        printf("==== Regardless of error, not reattempted because same iteration ====\n");
        dl_error = EINVAL;
-       ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 0));
+       ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 0));
        ck_cache_rsync(nodes);
        dl_error = 0;
-       ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", 0));
+       ck_assert_ptr_eq(NULL, run_dl_rsync("rsync://a.b.c/e", EINVAL, 0));
        ck_cache_rsync(nodes);
 
        cleanup_test();
index fe322e9316d80a72f0e70bb8a2188a8c0bbc5b82..c2e343921fa58002f1362e80cc5395482104b745 100644 (file)
@@ -5,6 +5,7 @@
 #include "alloc.c"
 #include "common.c"
 #include "mock.c"
+#include "types/array.h"
 #include "types/map.c"
 #include "types/path.c"
 
@@ -130,6 +131,170 @@ START_TEST(test_queue_interrupted)
 }
 END_TEST
 
+#define TEST_TASKS 3000
+
+struct test_task {
+       char id[8];
+       STAILQ_ENTRY(test_task) lh;
+};
+
+static STAILQ_HEAD(test_task_list, test_task) test_tasks;
+static pthread_mutex_t test_tasks_lock = PTHREAD_MUTEX_INITIALIZER;
+static bool return_busy;
+
+static void
+populate_test_tasks(void)
+{
+       struct test_task *task;
+       int printed;
+       unsigned int i;
+
+       STAILQ_INIT(&test_tasks);
+       for (i = 0; i < TEST_TASKS; i++) {
+               task = pmalloc(sizeof(struct test_task));
+               printed = snprintf(task->id, sizeof(task->id), "%u", i);
+               ck_assert_int_gt(printed, 0);
+               ck_assert_int_lt(printed, sizeof(task->id));
+               STAILQ_INSERT_TAIL(&test_tasks, task, lh);
+       }
+
+       printf("+ th-1: Queuing 'starter'\n");
+       queue_1("starter");
+}
+
+static int
+certificate_traverse_mock(struct rpki_certificate *ca, int thid)
+{
+       struct test_task *new[10];
+       unsigned int n;
+
+       /* Queue 10 of the available tasks for each dequeue */
+
+       mutex_lock(&test_tasks_lock);
+       for (n = 0; n < 10; n++) {
+               new[n] = STAILQ_FIRST(&test_tasks);
+               if (new[n])
+                       STAILQ_REMOVE_HEAD(&test_tasks, lh);
+       }
+       mutex_unlock(&test_tasks_lock);
+
+       for (n = 0; n < 10; n++) {
+               if (!new[n])
+                       break;
+               printf("+ th%d: Queuing '%s'\n", thid, new[n]->id);
+               queue_1(new[n]->id);
+               free(new[n]);
+       }
+
+       if (n != 0)
+               task_wakeup();
+
+       if (return_busy && (rand() & 3) == 0)
+               return EBUSY; /* Return "busy" 25% of the time */
+
+       return 0;
+}
+
+static void *
+user_thread(void *arg)
+{
+       int thid = *((int *)arg);
+       struct validation_task *task = NULL;
+       int total_dequeued = 0;
+
+       printf("th%d: Started.\n", thid);
+
+       while ((task = task_dequeue(task)) != NULL) {
+               printf("- th%d: Dequeued '%s'\n", thid, task->ca->map.url);
+               total_dequeued++;
+
+               if (certificate_traverse_mock(task->ca, thid) == EBUSY) {
+                       printf("+ th%d: Requeuing '%s'\n",
+                           thid, task->ca->map.url);
+                       task_requeue_busy(task);
+                       task = NULL;
+               }
+       }
+
+       printf("th%d: Dequeued %u times.\n", thid, total_dequeued);
+       return NULL;
+}
+
+static void
+run_threads(void)
+{
+       pthread_t threads[10];
+       int thids[10];
+       unsigned int i;
+
+       for (i = 0; i < ARRAY_LEN(threads); i++) {
+               thids[i] = i;
+               ck_assert_int_eq(0, pthread_create(&threads[i], NULL,
+                   user_thread, &thids[i]));
+       }
+
+       for (i = 0; i < ARRAY_LEN(threads); i++)
+               ck_assert_int_eq(0, pthread_join(threads[i], NULL));
+
+       ck_assert_int_eq(1, STAILQ_EMPTY(&test_tasks));
+       ck_assert_ptr_eq(NULL, task_dequeue(NULL));
+}
+
+START_TEST(test_queue_multiuser)
+{
+       return_busy = false;
+
+       task_setup();
+       task_start();
+
+       populate_test_tasks();
+       run_threads();
+
+       task_stop();
+       task_teardown();
+}
+END_TEST
+
+static void *
+release_busies(void *arg)
+{
+       unsigned int i;
+
+       for (i = 0; i < 2; i++) {
+               sleep(1);
+               printf("Waking up busy tasks!\n");
+               task_wakeup_busy();
+       }
+
+       sleep(1);
+       return_busy = false;
+       printf("Waking up busy tasks for the last time!\n");
+       task_wakeup_busy();
+
+       return NULL;
+}
+
+START_TEST(test_queue_multiuser_busy)
+{
+       pthread_t thr;
+
+       return_busy = true;
+
+       task_setup();
+       task_start();
+
+       ck_assert_int_eq(0, pthread_create(&thr, NULL, release_busies, NULL));
+
+       populate_test_tasks();
+       run_threads();
+
+       ck_assert_int_eq(0, pthread_join(thr, NULL));
+
+       task_stop();
+       task_teardown();
+}
+END_TEST
+
 static Suite *create_suite(void)
 {
        Suite *suite;
@@ -141,6 +306,8 @@ static Suite *create_suite(void)
        tcase_add_test(queue, test_queue_3);
        tcase_add_test(queue, test_queue_multiple);
        tcase_add_test(queue, test_queue_interrupted);
+       tcase_add_test(queue, test_queue_multiuser);
+       tcase_add_test(queue, test_queue_multiuser_busy);
 
        suite = suite_create("task");
        suite_add_tcase(suite, queue);